将 spark DataFrame 列转换为 python 列表
Convert spark DataFrame column to python list
我处理一个包含两列 mvv 和 count 的数据框。
+---+-----+
|mvv|count|
+---+-----+
| 1 | 5 |
| 2 | 9 |
| 3 | 3 |
| 4 | 1 |
我想获得两个包含 mvv 值和计数值的列表。像
mvv = [1,2,3,4]
count = [5,9,3,1]
所以,我尝试了以下代码: 第一行应该是 return 一个 python 行列表。我想看第一个值:
mvv_list = mvv_count_df.select('mvv').collect()
firstvalue = mvv_list[0].getInt(0)
但是我在第二行收到一条错误消息:
AttributeError: getInt
看看,为什么你这样做是行不通的。首先,您尝试从 Row 类型中获取整数,您的收集输出如下所示:
>>> mvv_list = mvv_count_df.select('mvv').collect()
>>> mvv_list[0]
Out: Row(mvv=1)
如果你采取这样的方式:
>>> firstvalue = mvv_list[0].mvv
Out: 1
您将获得 mvv
值。如果你想要数组的所有信息,你可以像这样:
>>> mvv_array = [int(row.mvv) for row in mvv_list.collect()]
>>> mvv_array
Out: [1,2,3,4]
但是如果您对另一列尝试相同的操作,您会得到:
>>> mvv_count = [int(row.count) for row in mvv_list.collect()]
Out: TypeError: int() argument must be a string or a number, not 'builtin_function_or_method'
发生这种情况是因为 count
是一个内置方法。并且该列的名称与 count
相同。解决方法是将 count
的列名称更改为 _count
:
>>> mvv_list = mvv_list.selectExpr("mvv as mvv", "count as _count")
>>> mvv_count = [int(row._count) for row in mvv_list.collect()]
但不需要此解决方法,因为您可以使用字典语法访问该列:
>>> mvv_array = [int(row['mvv']) for row in mvv_list.collect()]
>>> mvv_count = [int(row['count']) for row in mvv_list.collect()]
它最终会起作用!
下一页就是你想要的列表。
mvv = mvv_count_df.select("mvv").rdd.flatMap(lambda x: x).collect()
如果出现以下错误:
AttributeError: 'list' object has no attribute 'collect'
此代码将解决您的问题:
mvv_list = mvv_count_df.select('mvv').collect()
mvv_array = [int(i.mvv) for i in mvv_list]
下面的代码可以帮到你
mvv_count_df.select('mvv').rdd.map(lambda row : row[0]).collect()
这将为您提供所有元素的列表。
mvv_list = list(
mvv_count_df.select('mvv').toPandas()['mvv']
)
根据我的数据,我得到了这些基准:
>>> data.select(col).rdd.flatMap(lambda x: x).collect()
0.52 秒
>>> [row[col] for row in data.collect()]
0.271 秒
>>> list(data.select(col).toPandas()[col])
0.427 秒
结果是一样的
我 运行 进行了基准分析,list(mvv_count_df.select('mvv').toPandas()['mvv'])
是最快的方法。我很惊讶。
我 运行 使用 5 节点 i3.xlarge 集群(每个节点有 30.5 GB RAM 和 4 个内核)和 Spark 2.4.5 处理 10 万/1 亿行数据集的不同方法。数据均匀分布在 20 个活泼的单列压缩 Parquet 文件中。
这是基准测试结果(以秒为单位的运行时间):
+-------------------------------------------------------------+---------+-------------+
| Code | 100,000 | 100,000,000 |
+-------------------------------------------------------------+---------+-------------+
| df.select("col_name").rdd.flatMap(lambda x: x).collect() | 0.4 | 55.3 |
| list(df.select('col_name').toPandas()['col_name']) | 0.4 | 17.5 |
| df.select('col_name').rdd.map(lambda row : row[0]).collect()| 0.9 | 69 |
| [row[0] for row in df.select('col_name').collect()] | 1.0 | OOM |
| [r[0] for r in mid_df.select('col_name').toLocalIterator()] | 1.2 | * |
+-------------------------------------------------------------+---------+-------------+
* cancelled after 800 seconds
在驱动程序节点上收集数据时要遵循的黄金法则:
- 尝试用其他方法解决问题。将数据收集到驱动程序节点是昂贵的,不利用 Spark 集群的力量,应尽可能避免。
- 收集尽可能少的行。在收集数据之前聚合、删除重复项、过滤和修剪列。尽可能少地向驱动程序节点发送数据。
toPandas
was significantly improved in Spark 2.3。如果您使用的是 2.3 之前的 Spark 版本,这可能不是最佳方法。
有关详细信息/基准测试结果,请参阅 here。
一个可能的解决方案是使用 pyspark.sql.functions
中的 collect_list()
函数。这会将所有列值聚合到一个 pyspark 数组中,该数组在收集时转换为 python 列表:
mvv_list = df.select(collect_list("mvv")).collect()[0][0]
count_list = df.select(collect_list("count")).collect()[0][0]
让我们创建有问题的数据框
df_test = spark.createDataFrame(
[
(1, 5),
(2, 9),
(3, 3),
(4, 1),
],
['mvv', 'count']
)
df_test.show()
给出
+---+-----+
|mvv|count|
+---+-----+
| 1| 5|
| 2| 9|
| 3| 3|
| 4| 1|
+---+-----+
然后应用rdd.flatMap(f).collect()得到列表
test_list = df_test.select("mvv").rdd.flatMap(list).collect()
print(type(test_list))
print(test_list)
这给出了
<type 'list'>
[1, 2, 3, 4]
尽管有很多答案,但当您需要将列表与 when
和 isin
命令结合使用时,其中一些答案将不起作用。产生扁平值列表的最简单但有效的方法是使用列表理解和 [0]
来避免行名称:
flatten_list_from_spark_df=[i[0] for i in df.select("your column").collect()]
另一种方法是使用熊猫数据框,然后使用list
功能,但它不如this.a
方便和有效
您可以先使用 return 行类型
列表收集 df
row_list = df.select('mvv').collect()
迭代行以转换为列表
sno_id_array = [ int(row.mvv) for row in row_list]
sno_id_array
[1,2,3,4]
使用平面图
sno_id_array = df.select("mvv").rdd.flatMap(lambda x: x).collect()
我处理一个包含两列 mvv 和 count 的数据框。
+---+-----+
|mvv|count|
+---+-----+
| 1 | 5 |
| 2 | 9 |
| 3 | 3 |
| 4 | 1 |
我想获得两个包含 mvv 值和计数值的列表。像
mvv = [1,2,3,4]
count = [5,9,3,1]
所以,我尝试了以下代码: 第一行应该是 return 一个 python 行列表。我想看第一个值:
mvv_list = mvv_count_df.select('mvv').collect()
firstvalue = mvv_list[0].getInt(0)
但是我在第二行收到一条错误消息:
AttributeError: getInt
看看,为什么你这样做是行不通的。首先,您尝试从 Row 类型中获取整数,您的收集输出如下所示:
>>> mvv_list = mvv_count_df.select('mvv').collect()
>>> mvv_list[0]
Out: Row(mvv=1)
如果你采取这样的方式:
>>> firstvalue = mvv_list[0].mvv
Out: 1
您将获得 mvv
值。如果你想要数组的所有信息,你可以像这样:
>>> mvv_array = [int(row.mvv) for row in mvv_list.collect()]
>>> mvv_array
Out: [1,2,3,4]
但是如果您对另一列尝试相同的操作,您会得到:
>>> mvv_count = [int(row.count) for row in mvv_list.collect()]
Out: TypeError: int() argument must be a string or a number, not 'builtin_function_or_method'
发生这种情况是因为 count
是一个内置方法。并且该列的名称与 count
相同。解决方法是将 count
的列名称更改为 _count
:
>>> mvv_list = mvv_list.selectExpr("mvv as mvv", "count as _count")
>>> mvv_count = [int(row._count) for row in mvv_list.collect()]
但不需要此解决方法,因为您可以使用字典语法访问该列:
>>> mvv_array = [int(row['mvv']) for row in mvv_list.collect()]
>>> mvv_count = [int(row['count']) for row in mvv_list.collect()]
它最终会起作用!
下一页就是你想要的列表。
mvv = mvv_count_df.select("mvv").rdd.flatMap(lambda x: x).collect()
如果出现以下错误:
AttributeError: 'list' object has no attribute 'collect'
此代码将解决您的问题:
mvv_list = mvv_count_df.select('mvv').collect()
mvv_array = [int(i.mvv) for i in mvv_list]
下面的代码可以帮到你
mvv_count_df.select('mvv').rdd.map(lambda row : row[0]).collect()
这将为您提供所有元素的列表。
mvv_list = list(
mvv_count_df.select('mvv').toPandas()['mvv']
)
根据我的数据,我得到了这些基准:
>>> data.select(col).rdd.flatMap(lambda x: x).collect()
0.52 秒
>>> [row[col] for row in data.collect()]
0.271 秒
>>> list(data.select(col).toPandas()[col])
0.427 秒
结果是一样的
我 运行 进行了基准分析,list(mvv_count_df.select('mvv').toPandas()['mvv'])
是最快的方法。我很惊讶。
我 运行 使用 5 节点 i3.xlarge 集群(每个节点有 30.5 GB RAM 和 4 个内核)和 Spark 2.4.5 处理 10 万/1 亿行数据集的不同方法。数据均匀分布在 20 个活泼的单列压缩 Parquet 文件中。
这是基准测试结果(以秒为单位的运行时间):
+-------------------------------------------------------------+---------+-------------+
| Code | 100,000 | 100,000,000 |
+-------------------------------------------------------------+---------+-------------+
| df.select("col_name").rdd.flatMap(lambda x: x).collect() | 0.4 | 55.3 |
| list(df.select('col_name').toPandas()['col_name']) | 0.4 | 17.5 |
| df.select('col_name').rdd.map(lambda row : row[0]).collect()| 0.9 | 69 |
| [row[0] for row in df.select('col_name').collect()] | 1.0 | OOM |
| [r[0] for r in mid_df.select('col_name').toLocalIterator()] | 1.2 | * |
+-------------------------------------------------------------+---------+-------------+
* cancelled after 800 seconds
在驱动程序节点上收集数据时要遵循的黄金法则:
- 尝试用其他方法解决问题。将数据收集到驱动程序节点是昂贵的,不利用 Spark 集群的力量,应尽可能避免。
- 收集尽可能少的行。在收集数据之前聚合、删除重复项、过滤和修剪列。尽可能少地向驱动程序节点发送数据。
toPandas
was significantly improved in Spark 2.3。如果您使用的是 2.3 之前的 Spark 版本,这可能不是最佳方法。
有关详细信息/基准测试结果,请参阅 here。
一个可能的解决方案是使用 pyspark.sql.functions
中的 collect_list()
函数。这会将所有列值聚合到一个 pyspark 数组中,该数组在收集时转换为 python 列表:
mvv_list = df.select(collect_list("mvv")).collect()[0][0]
count_list = df.select(collect_list("count")).collect()[0][0]
让我们创建有问题的数据框
df_test = spark.createDataFrame(
[
(1, 5),
(2, 9),
(3, 3),
(4, 1),
],
['mvv', 'count']
)
df_test.show()
给出
+---+-----+
|mvv|count|
+---+-----+
| 1| 5|
| 2| 9|
| 3| 3|
| 4| 1|
+---+-----+
然后应用rdd.flatMap(f).collect()得到列表
test_list = df_test.select("mvv").rdd.flatMap(list).collect()
print(type(test_list))
print(test_list)
这给出了
<type 'list'>
[1, 2, 3, 4]
尽管有很多答案,但当您需要将列表与 when
和 isin
命令结合使用时,其中一些答案将不起作用。产生扁平值列表的最简单但有效的方法是使用列表理解和 [0]
来避免行名称:
flatten_list_from_spark_df=[i[0] for i in df.select("your column").collect()]
另一种方法是使用熊猫数据框,然后使用list
功能,但它不如this.a
您可以先使用 return 行类型
列表收集 dfrow_list = df.select('mvv').collect()
迭代行以转换为列表
sno_id_array = [ int(row.mvv) for row in row_list]
sno_id_array
[1,2,3,4]
使用平面图
sno_id_array = df.select("mvv").rdd.flatMap(lambda x: x).collect()