Pyspark - 聚合具有多个条件的嵌套项目

Pyspark - Aggregation on nested items with multiple conditions

我正在使用 Pyspark 并且我有一个具有以下架构的数据框

 root
     |-- BOOK_ID: integer (nullable = false)
     |-- Chapters: array (nullable = true) 
     |    |-- element: struct (containsNull = true)
     |    |    |-- NAME: string (nullable = true)
     |    |    |-- NUMBER_PAGES: integer (nullable = true)

我们如何添加一个名为 short_chapters 的新列,该列 为每本书计算 NUMBER_PAGES 的章节 NAME.length < 10?

注意:我们有一个章节列表,有没有一种方法可以在不展平数据帧的情况下进行迭代?

您需要定义一个过滤章节的用户定义函数(简称 udf)。 udf 的 return 类型与我们使用架构采用的 Chapters 列相同。

from pyspark.sql.functions import udf

def filter_short_chapters(chapters, thresh):
    return list(filter(lambda chapter: chapter.NUMBER_PAGES < thresh, chapters))

udf_fn = udf(filter_short_chapter, df.schema['Chapters'].dataType)
df = df.withColumn('short_chapters', udf_fn(df.Chapters, 10))

您可以使用 higher order functions. Find all chapters whose name has length < 10 using filter. Then add the NUMBER_PAGES for the chapters identified using aggregate 计算 short_chapters

from pyspark.sql import functions as F
from pyspark.sql import Row

df = spark.createDataFrame([("1", [Row(NAME="xs", NUMBER_PAGES=1),
                                   Row(NAME="s", NUMBER_PAGES=5),
                                   Row(NAME="Really Long Name", NUMBER_PAGES=100),
                                   Row(NAME="Really Long Name", NUMBER_PAGES=150), ],), ],
                           'struct<BOOK_ID:string,Chapters:array<struct<NAME:string,NUMBER_PAGES:int>>>')

df.printSchema()

"""
root
 |-- BOOK_ID: string (nullable = true)
 |-- Chapters: array (nullable = true)
 |    |-- element: struct (containsNull = true)
 |    |    |-- NAME: string (nullable = true)
 |    |    |-- NUMBER_PAGES: integer (nullable = true)
"""

# Filter for short chapters
short_chapters = F.filter("Chapters", lambda x: F.length(x["NAME"]) < 10)

# Sum number of pages for short chapters
pages_in_short_chapter = F.aggregate(short_chapters, F.lit(0), lambda acc, x: acc + x["NUMBER_PAGES"])

df.withColumn("short_chapters", pages_in_short_chapter).show(truncate=False)

"""
+-------+-------------------------------------------------------------------+--------------+
|BOOK_ID|Chapters                                                           |short_chapters|
+-------+-------------------------------------------------------------------+--------------+
|1      |[{xs, 1}, {s, 5}, {Really Long Name, 100}, {Really Long Name, 150}]|6             |
+-------+-------------------------------------------------------------------+--------------+
"""
df = spark.createDataFrame([("1", [Row(NAME="xs", NUMBER_PAGES=1),
                                   Row(NAME="s", NUMBER_PAGES=5),
                                   Row(NAME="Really Long Name", NUMBER_PAGES=100),
                                   Row(NAME="Really Long Name", NUMBER_PAGES=150), ],), ],
                           'struct<BOOK_ID:string,Chapters:array<struct<NAME:string,NUMBER_PAGES:int>>>')

df.printSchema()

root
 |-- BOOK_ID: string (nullable = true)
 |-- Chapters: array (nullable = true)
 |    |-- element: struct (containsNull = true)
 |    |    |-- NAME: string (nullable = true)
 |    |    |-- NUMBER_PAGES: integer (nullable = true)

解决方案

在 StructType 列上添加一列并填写所需的结果。 对于结果链用户定义的函数。首先通过长度小于 10 的章节名称的布尔选择来过滤章节编号。汇总上面过滤的页码总和。下面的代码

import pyspark.sql.functions as f

df = df.withColumn(
    "Chapters",
    f.struct(
        f.col("Chapters"),
        f.lit(expr("aggregate(filter(Chapters.NUMBER_PAGES, (x, i) -> boolean(transform(Chapters.NAME,x->length(x)<10)[i])),0,(acc,x)->acc+x)")).alias("short_chapters")
    )
)
df.printSchema()
root
 |-- BOOK_ID: string (nullable = true)
 |-- Chapters: struct (nullable = false)
 |    |-- Chapters: array (nullable = true)
 |    |    |-- element: struct (containsNull = true)
 |    |    |    |-- NAME: string (nullable = true)
 |    |    |    |-- NUMBER_PAGES: integer (nullable = true)
 |    |-- short_chapters: integer (nullable = true)

df.show(truncate=False)

+-------+------------------------------------------------------------------------+
|BOOK_ID|Chapters                                                                |
+-------+------------------------------------------------------------------------+
|1      |{[{xs, 1}, {s, 5}, {Really Long Name, 100}, {Really Long Name, 150}], 6}|
+-------+------------------------------------------------------------------------+

df.select('Chapters.*').show(truncate=False)
+-------------------------------------------------------------------+--------------+
|Chapters                                                           |short_chapters|
+-------------------------------------------------------------------+--------------+
|[{xs, 1}, {s, 5}, {Really Long Name, 100}, {Really Long Name, 150}]|6             |
+-------------------------------------------------------------------+--------------+