PySpark - 获取组中每个列表的大小
PySpark - Get the size of each list in group by
我有一个庞大的 pyspark 数据框。我需要将 Person
然后 collect
他们的 Budget
项目分组到一个列表中,以执行进一步的计算。
例如,
a = [('Bob', 562,"Food", "12 May 2018"), ('Bob',880,"Food","01 June 2018"), ('Bob',380,'Household'," 16 June 2018"), ('Sue',85,'Household'," 16 July 2018"), ('Sue',963,'Household'," 16 Sept 2018")]
df = spark.createDataFrame(a, ["Person", "Amount","Budget", "Date"])
分组依据:
import pyspark.sql.functions as F
df_grouped = df.groupby('person').agg(F.collect_list("Budget").alias("data"))
架构:
root
|-- person: string (nullable = true)
|-- data: array (nullable = true)
| |-- element: string (containsNull = true)
但是,当我尝试对每个人应用 UDF 时出现内存错误。如何获得每个人的每个列表 (data
) 的大小(以兆字节或千兆字节为单位)?
我已经完成了以下操作,但我得到了 nulls
import sys
size_list_udf = F.udf(lambda data: sys.getsizeof(data)/1000, DoubleType())
df_grouped = df_grouped.withColumn("size",size_list_udf("data") )
df_grouped.show()
输出:
+------+--------------------+----+
|person| data|size|
+------+--------------------+----+
| Sue|[Household, House...|null|
| Bob|[Food, Food, Hous...|null|
+------+--------------------+----+
您的代码只有一个小问题。 sys.getsizeof()
returns 对象的大小(以字节为整数)。您将其除以整数值 1000
以获得千字节。在 python 2 中,这个 return 是一个整数。但是,您将 udf
定义为 return a DoubleType()
。简单的解决方法是除以 1000.0
.
import sys
size_list_udf = f.udf(lambda data: sys.getsizeof(data)/1000.0, DoubleType())
df_grouped = df_grouped.withColumn("size",size_list_udf("data") )
df_grouped.show(truncate=False)
#+------+-----------------------+-----+
#|person|data |size |
#+------+-----------------------+-----+
#|Sue |[Household, Household] |0.112|
#|Bob |[Food, Food, Household]|0.12 |
#+------+-----------------------+-----+
我发现在 udf
是 returning null
的情况下,罪魁祸首经常是类型不匹配。
我有一个庞大的 pyspark 数据框。我需要将 Person
然后 collect
他们的 Budget
项目分组到一个列表中,以执行进一步的计算。
例如,
a = [('Bob', 562,"Food", "12 May 2018"), ('Bob',880,"Food","01 June 2018"), ('Bob',380,'Household'," 16 June 2018"), ('Sue',85,'Household'," 16 July 2018"), ('Sue',963,'Household'," 16 Sept 2018")]
df = spark.createDataFrame(a, ["Person", "Amount","Budget", "Date"])
分组依据:
import pyspark.sql.functions as F
df_grouped = df.groupby('person').agg(F.collect_list("Budget").alias("data"))
架构:
root
|-- person: string (nullable = true)
|-- data: array (nullable = true)
| |-- element: string (containsNull = true)
但是,当我尝试对每个人应用 UDF 时出现内存错误。如何获得每个人的每个列表 (data
) 的大小(以兆字节或千兆字节为单位)?
我已经完成了以下操作,但我得到了 nulls
import sys
size_list_udf = F.udf(lambda data: sys.getsizeof(data)/1000, DoubleType())
df_grouped = df_grouped.withColumn("size",size_list_udf("data") )
df_grouped.show()
输出:
+------+--------------------+----+
|person| data|size|
+------+--------------------+----+
| Sue|[Household, House...|null|
| Bob|[Food, Food, Hous...|null|
+------+--------------------+----+
您的代码只有一个小问题。 sys.getsizeof()
returns 对象的大小(以字节为整数)。您将其除以整数值 1000
以获得千字节。在 python 2 中,这个 return 是一个整数。但是,您将 udf
定义为 return a DoubleType()
。简单的解决方法是除以 1000.0
.
import sys
size_list_udf = f.udf(lambda data: sys.getsizeof(data)/1000.0, DoubleType())
df_grouped = df_grouped.withColumn("size",size_list_udf("data") )
df_grouped.show(truncate=False)
#+------+-----------------------+-----+
#|person|data |size |
#+------+-----------------------+-----+
#|Sue |[Household, Household] |0.112|
#|Bob |[Food, Food, Household]|0.12 |
#+------+-----------------------+-----+
我发现在 udf
是 returning null
的情况下,罪魁祸首经常是类型不匹配。