PySpark - 有没有办法水平连接两个数据帧,以便第一个 df 中的每一行都包含第二个 df 中的所有行
PySpark - Is there a way to join two dataframes horizontally so that each row in first df has all rows in second df
所以我有一个具有独特 user_ids 的用户 df 和一个带有一组问题的第二个 df。然后我想合并 dfs,以便每个 user_id 都附加到完整的问题集:
用户 Df:
+--------------------------+
|user_id |
+--------------------------+
|GDDVWWIOOKDY4WWBCICM4VOQHQ|
|77VC23NYEWLGHVVS4UMHJEVESU|
|VCOX7HUHTMPFCUOGYWGL4DMIRI|
|XPJBJMABYXLTZCKSONJVBCOXQM|
|QHTPQSFNOA5YEWH6N7FREBMMDM|
|JLQNBYCSC4DGCOHNLRBK5UANWI|
|RWYUOLBKIQMZVYHZJYCQ7SGTKA|
|CR33NGPK2GKK6G35SLZB7TGIJE|
|N6K7URSGH65T5UT6PZHMN62E2U|
|SZMPG3FQQOHGDV23UVXODTQETE|
+--------------------------+
问题 Df
+--------------------+-------------------+-----------------+--------------------+
| category_type| category_subject| question_id| question|
+--------------------+-------------------+-----------------+--------------------+
|Consumer & Lifestyle| Dietary Habits|pdl_diet_identity|Eating habits des...|
|Consumer & Lifestyle| Dietary Habits|pdl_diet_identity|Eating habits des...|
|Consumer & Lifestyle| Dietary Habits|pdl_diet_identity|Eating habits des...|
|Consumer & Lifestyle| Dietary Habits|pdl_diet_identity|Eating habits des...|
|Consumer & Lifestyle| Dietary Habits|pdl_diet_identity|Eating habits des...|
|Consumer & Lifestyle| Dietary Habits|pdl_diet_identity|Eating habits des...|
|Consumer & Lifestyle| Dietary Habits|pdl_diet_identity|Eating habits des...|
| Demographics|Social Demographics|pdl_ethnicity_new| Ethnicity|
| Demographics|Social Demographics|pdl_ethnicity_new| Ethnicity|
| Demographics|Social Demographics|pdl_ethnicity_new| Ethnicity|
+--------------------+-------------------+-----------------+--------------------+
所以现在我把 user_ids 变成一个列表并循环遍历它们,创建关于问题 df 的新列,从结果创建一个临时 df。然后我合并到最终的 df 以保存 user_id 迭代的结果,如下所示:
创建user_id列表:
unique_users_list = users_df \
.select("user_id") \
.agg(f.collect_list('user_id')).collect()[0][0]
创建空的最终 df 以附加到:
finaldf_schema = StructType([
StructField("category_type", StringType(), False),
StructField("category_subject", StringType(), False),
StructField("question_id", StringType(), False),
StructField("question", StringType(), False),
StructField("user_id", StringType(), False)
])
final_df = spark.createDataFrame([], finaldf_schema)
然后循环 user_id 合并到问题 df:
for user_id in unique_users_list:
temp_df = questions_df.withColumn("user_id", f.lit(user_id))
final_df = final_df.union(temp_df)
但是,我发现性能很慢。请问有没有更高效快捷的方法呢
谢谢
您要找的是笛卡尔积。您可以使用 pyspark.sql.DataFrame.crossJoin()
:
实现此目的
尝试:
final_df = users_df.crossJoin(questions_df)
所以我有一个具有独特 user_ids 的用户 df 和一个带有一组问题的第二个 df。然后我想合并 dfs,以便每个 user_id 都附加到完整的问题集:
用户 Df:
+--------------------------+
|user_id |
+--------------------------+
|GDDVWWIOOKDY4WWBCICM4VOQHQ|
|77VC23NYEWLGHVVS4UMHJEVESU|
|VCOX7HUHTMPFCUOGYWGL4DMIRI|
|XPJBJMABYXLTZCKSONJVBCOXQM|
|QHTPQSFNOA5YEWH6N7FREBMMDM|
|JLQNBYCSC4DGCOHNLRBK5UANWI|
|RWYUOLBKIQMZVYHZJYCQ7SGTKA|
|CR33NGPK2GKK6G35SLZB7TGIJE|
|N6K7URSGH65T5UT6PZHMN62E2U|
|SZMPG3FQQOHGDV23UVXODTQETE|
+--------------------------+
问题 Df
+--------------------+-------------------+-----------------+--------------------+
| category_type| category_subject| question_id| question|
+--------------------+-------------------+-----------------+--------------------+
|Consumer & Lifestyle| Dietary Habits|pdl_diet_identity|Eating habits des...|
|Consumer & Lifestyle| Dietary Habits|pdl_diet_identity|Eating habits des...|
|Consumer & Lifestyle| Dietary Habits|pdl_diet_identity|Eating habits des...|
|Consumer & Lifestyle| Dietary Habits|pdl_diet_identity|Eating habits des...|
|Consumer & Lifestyle| Dietary Habits|pdl_diet_identity|Eating habits des...|
|Consumer & Lifestyle| Dietary Habits|pdl_diet_identity|Eating habits des...|
|Consumer & Lifestyle| Dietary Habits|pdl_diet_identity|Eating habits des...|
| Demographics|Social Demographics|pdl_ethnicity_new| Ethnicity|
| Demographics|Social Demographics|pdl_ethnicity_new| Ethnicity|
| Demographics|Social Demographics|pdl_ethnicity_new| Ethnicity|
+--------------------+-------------------+-----------------+--------------------+
所以现在我把 user_ids 变成一个列表并循环遍历它们,创建关于问题 df 的新列,从结果创建一个临时 df。然后我合并到最终的 df 以保存 user_id 迭代的结果,如下所示:
创建user_id列表:
unique_users_list = users_df \
.select("user_id") \
.agg(f.collect_list('user_id')).collect()[0][0]
创建空的最终 df 以附加到:
finaldf_schema = StructType([
StructField("category_type", StringType(), False),
StructField("category_subject", StringType(), False),
StructField("question_id", StringType(), False),
StructField("question", StringType(), False),
StructField("user_id", StringType(), False)
])
final_df = spark.createDataFrame([], finaldf_schema)
然后循环 user_id 合并到问题 df:
for user_id in unique_users_list:
temp_df = questions_df.withColumn("user_id", f.lit(user_id))
final_df = final_df.union(temp_df)
但是,我发现性能很慢。请问有没有更高效快捷的方法呢
谢谢
您要找的是笛卡尔积。您可以使用 pyspark.sql.DataFrame.crossJoin()
:
尝试:
final_df = users_df.crossJoin(questions_df)