Pyspark - 聚合具有多个条件的嵌套项目
Pyspark - Aggregation on nested items with multiple conditions
我正在使用 Pyspark
并且我有一个具有以下架构的数据框
root
|-- BOOK_ID: integer (nullable = false)
|-- Chapters: array (nullable = true)
| |-- element: struct (containsNull = true)
| | |-- NAME: string (nullable = true)
| | |-- NUMBER_PAGES: integer (nullable = true)
我们如何添加一个名为 short_chapters
的新列,该列 为每本书计算 NUMBER_PAGES 的章节 NAME.length < 10?
注意:我们有一个章节列表,有没有一种方法可以在不展平数据帧的情况下进行迭代?
您需要定义一个过滤章节的用户定义函数(简称 udf)。 udf 的 return 类型与我们使用架构采用的 Chapters 列相同。
from pyspark.sql.functions import udf
def filter_short_chapters(chapters, thresh):
return list(filter(lambda chapter: chapter.NUMBER_PAGES < thresh, chapters))
udf_fn = udf(filter_short_chapter, df.schema['Chapters'].dataType)
df = df.withColumn('short_chapters', udf_fn(df.Chapters, 10))
您可以使用 higher order functions
. Find all chapters whose name has length < 10 using filter
. Then add the NUMBER_PAGES
for the chapters identified using aggregate
计算 short_chapters
。
from pyspark.sql import functions as F
from pyspark.sql import Row
df = spark.createDataFrame([("1", [Row(NAME="xs", NUMBER_PAGES=1),
Row(NAME="s", NUMBER_PAGES=5),
Row(NAME="Really Long Name", NUMBER_PAGES=100),
Row(NAME="Really Long Name", NUMBER_PAGES=150), ],), ],
'struct<BOOK_ID:string,Chapters:array<struct<NAME:string,NUMBER_PAGES:int>>>')
df.printSchema()
"""
root
|-- BOOK_ID: string (nullable = true)
|-- Chapters: array (nullable = true)
| |-- element: struct (containsNull = true)
| | |-- NAME: string (nullable = true)
| | |-- NUMBER_PAGES: integer (nullable = true)
"""
# Filter for short chapters
short_chapters = F.filter("Chapters", lambda x: F.length(x["NAME"]) < 10)
# Sum number of pages for short chapters
pages_in_short_chapter = F.aggregate(short_chapters, F.lit(0), lambda acc, x: acc + x["NUMBER_PAGES"])
df.withColumn("short_chapters", pages_in_short_chapter).show(truncate=False)
"""
+-------+-------------------------------------------------------------------+--------------+
|BOOK_ID|Chapters |short_chapters|
+-------+-------------------------------------------------------------------+--------------+
|1 |[{xs, 1}, {s, 5}, {Really Long Name, 100}, {Really Long Name, 150}]|6 |
+-------+-------------------------------------------------------------------+--------------+
"""
df = spark.createDataFrame([("1", [Row(NAME="xs", NUMBER_PAGES=1),
Row(NAME="s", NUMBER_PAGES=5),
Row(NAME="Really Long Name", NUMBER_PAGES=100),
Row(NAME="Really Long Name", NUMBER_PAGES=150), ],), ],
'struct<BOOK_ID:string,Chapters:array<struct<NAME:string,NUMBER_PAGES:int>>>')
df.printSchema()
root
|-- BOOK_ID: string (nullable = true)
|-- Chapters: array (nullable = true)
| |-- element: struct (containsNull = true)
| | |-- NAME: string (nullable = true)
| | |-- NUMBER_PAGES: integer (nullable = true)
解决方案
在 StructType 列上添加一列并填写所需的结果。
对于结果链用户定义的函数。首先通过长度小于 10 的章节名称的布尔选择来过滤章节编号。汇总上面过滤的页码总和。下面的代码
import pyspark.sql.functions as f
df = df.withColumn(
"Chapters",
f.struct(
f.col("Chapters"),
f.lit(expr("aggregate(filter(Chapters.NUMBER_PAGES, (x, i) -> boolean(transform(Chapters.NAME,x->length(x)<10)[i])),0,(acc,x)->acc+x)")).alias("short_chapters")
)
)
df.printSchema()
root
|-- BOOK_ID: string (nullable = true)
|-- Chapters: struct (nullable = false)
| |-- Chapters: array (nullable = true)
| | |-- element: struct (containsNull = true)
| | | |-- NAME: string (nullable = true)
| | | |-- NUMBER_PAGES: integer (nullable = true)
| |-- short_chapters: integer (nullable = true)
df.show(truncate=False)
+-------+------------------------------------------------------------------------+
|BOOK_ID|Chapters |
+-------+------------------------------------------------------------------------+
|1 |{[{xs, 1}, {s, 5}, {Really Long Name, 100}, {Really Long Name, 150}], 6}|
+-------+------------------------------------------------------------------------+
df.select('Chapters.*').show(truncate=False)
+-------------------------------------------------------------------+--------------+
|Chapters |short_chapters|
+-------------------------------------------------------------------+--------------+
|[{xs, 1}, {s, 5}, {Really Long Name, 100}, {Really Long Name, 150}]|6 |
+-------------------------------------------------------------------+--------------+
我正在使用 Pyspark
并且我有一个具有以下架构的数据框
root
|-- BOOK_ID: integer (nullable = false)
|-- Chapters: array (nullable = true)
| |-- element: struct (containsNull = true)
| | |-- NAME: string (nullable = true)
| | |-- NUMBER_PAGES: integer (nullable = true)
我们如何添加一个名为 short_chapters
的新列,该列 为每本书计算 NUMBER_PAGES 的章节 NAME.length < 10?
注意:我们有一个章节列表,有没有一种方法可以在不展平数据帧的情况下进行迭代?
您需要定义一个过滤章节的用户定义函数(简称 udf)。 udf 的 return 类型与我们使用架构采用的 Chapters 列相同。
from pyspark.sql.functions import udf
def filter_short_chapters(chapters, thresh):
return list(filter(lambda chapter: chapter.NUMBER_PAGES < thresh, chapters))
udf_fn = udf(filter_short_chapter, df.schema['Chapters'].dataType)
df = df.withColumn('short_chapters', udf_fn(df.Chapters, 10))
您可以使用 higher order functions
. Find all chapters whose name has length < 10 using filter
. Then add the NUMBER_PAGES
for the chapters identified using aggregate
计算 short_chapters
。
from pyspark.sql import functions as F
from pyspark.sql import Row
df = spark.createDataFrame([("1", [Row(NAME="xs", NUMBER_PAGES=1),
Row(NAME="s", NUMBER_PAGES=5),
Row(NAME="Really Long Name", NUMBER_PAGES=100),
Row(NAME="Really Long Name", NUMBER_PAGES=150), ],), ],
'struct<BOOK_ID:string,Chapters:array<struct<NAME:string,NUMBER_PAGES:int>>>')
df.printSchema()
"""
root
|-- BOOK_ID: string (nullable = true)
|-- Chapters: array (nullable = true)
| |-- element: struct (containsNull = true)
| | |-- NAME: string (nullable = true)
| | |-- NUMBER_PAGES: integer (nullable = true)
"""
# Filter for short chapters
short_chapters = F.filter("Chapters", lambda x: F.length(x["NAME"]) < 10)
# Sum number of pages for short chapters
pages_in_short_chapter = F.aggregate(short_chapters, F.lit(0), lambda acc, x: acc + x["NUMBER_PAGES"])
df.withColumn("short_chapters", pages_in_short_chapter).show(truncate=False)
"""
+-------+-------------------------------------------------------------------+--------------+
|BOOK_ID|Chapters |short_chapters|
+-------+-------------------------------------------------------------------+--------------+
|1 |[{xs, 1}, {s, 5}, {Really Long Name, 100}, {Really Long Name, 150}]|6 |
+-------+-------------------------------------------------------------------+--------------+
"""
df = spark.createDataFrame([("1", [Row(NAME="xs", NUMBER_PAGES=1),
Row(NAME="s", NUMBER_PAGES=5),
Row(NAME="Really Long Name", NUMBER_PAGES=100),
Row(NAME="Really Long Name", NUMBER_PAGES=150), ],), ],
'struct<BOOK_ID:string,Chapters:array<struct<NAME:string,NUMBER_PAGES:int>>>')
df.printSchema()
root
|-- BOOK_ID: string (nullable = true)
|-- Chapters: array (nullable = true)
| |-- element: struct (containsNull = true)
| | |-- NAME: string (nullable = true)
| | |-- NUMBER_PAGES: integer (nullable = true)
解决方案
在 StructType 列上添加一列并填写所需的结果。 对于结果链用户定义的函数。首先通过长度小于 10 的章节名称的布尔选择来过滤章节编号。汇总上面过滤的页码总和。下面的代码
import pyspark.sql.functions as f
df = df.withColumn(
"Chapters",
f.struct(
f.col("Chapters"),
f.lit(expr("aggregate(filter(Chapters.NUMBER_PAGES, (x, i) -> boolean(transform(Chapters.NAME,x->length(x)<10)[i])),0,(acc,x)->acc+x)")).alias("short_chapters")
)
)
df.printSchema()
root
|-- BOOK_ID: string (nullable = true)
|-- Chapters: struct (nullable = false)
| |-- Chapters: array (nullable = true)
| | |-- element: struct (containsNull = true)
| | | |-- NAME: string (nullable = true)
| | | |-- NUMBER_PAGES: integer (nullable = true)
| |-- short_chapters: integer (nullable = true)
df.show(truncate=False)
+-------+------------------------------------------------------------------------+
|BOOK_ID|Chapters |
+-------+------------------------------------------------------------------------+
|1 |{[{xs, 1}, {s, 5}, {Really Long Name, 100}, {Really Long Name, 150}], 6}|
+-------+------------------------------------------------------------------------+
df.select('Chapters.*').show(truncate=False)
+-------------------------------------------------------------------+--------------+
|Chapters |short_chapters|
+-------------------------------------------------------------------+--------------+
|[{xs, 1}, {s, 5}, {Really Long Name, 100}, {Really Long Name, 150}]|6 |
+-------------------------------------------------------------------+--------------+