如何将 3 个 DataFrame 加入到一个 DataFrame 中?
How to join 3 DataFrames in to one DataFrame?
我正在尝试将三个不同的 DataFrame 合并为一个,但我在将所有三个 DataFrames 都加入时遇到了问题。我已经可以加入两个了。
如何正确连接三个DataFrame?
Pyspark: 1.6.0
以下是我目前的工作:
# EXPECTED OUTPUT:
# -------file1.csv---------|---file2.csv--|---file3.csv------------|
# |col1|col2|col3|col4|col5|col1|col2|col3|col1|col2|col3|col4|col5|
# Loading in all the files
file1_rdd = sc.textFile("file1.csv").map(lambda line: line.split(","))
file2_rdd = sc.textFile("file2.csv").map(lambda line: line.split(","))
file3_rdd = sc.textFile("file3.csv").map(lambda line: line.split(","))
# Capturing the header
file1_header = file1_rdd.first()
file2_header = file2_rdd.first()
file3_header = file3_rdd.first()
# Removing the header from the table rows
df_file1 = file1_rdd.filter(lambda row : row != file1_header).toDF(file1_header)
df_file2 = file1_rdd.filter(lambda row : row != file2_header).toDF(file2_header)
df_file3 = file1_rdd.filter(lambda row : row != file3_header).toDF(file3_header)
# WORKS: df_file1.join(df_file2, df_file1.col1 == df_file2.col2)
# OUTPUT:
# -------file1.csv---------|---file2.csv--|
# |col1|col2|col3|col4|col5|col1|col2|col3|
# DOES NOT WORK: df_file1.join(df_file2, df_file1.col1 == df_file2.col2).join(df_file3, df_file2.col2 == df_file3.col2)
# OUTPUT:
# Caused by: java.lang.IllegalStateException: Input row doesn't have expected number of values required by the schema. 4 fields are required while 5 values are provided.
即使字段长度不同,但我可以加入前两个字段而不会出现错误,为什么会出现需要 4 个字段的错误?
问题是 file3.csv
包含未正确清理的数据。为了修复它,我只是像这样强制执行最大拆分:
file3_rdd = sc.textFile("file3.csv").map(lambda line: line.split(",", 3))
对于可能遇到过类似问题的任何阅读者:检查您是否可以首先独立查看表格而没有错误。执行 df_file3.show()
会返回相同的错误,并且会帮助我更快地看到问题。
我正在尝试将三个不同的 DataFrame 合并为一个,但我在将所有三个 DataFrames 都加入时遇到了问题。我已经可以加入两个了。
如何正确连接三个DataFrame?
Pyspark: 1.6.0
以下是我目前的工作:
# EXPECTED OUTPUT:
# -------file1.csv---------|---file2.csv--|---file3.csv------------|
# |col1|col2|col3|col4|col5|col1|col2|col3|col1|col2|col3|col4|col5|
# Loading in all the files
file1_rdd = sc.textFile("file1.csv").map(lambda line: line.split(","))
file2_rdd = sc.textFile("file2.csv").map(lambda line: line.split(","))
file3_rdd = sc.textFile("file3.csv").map(lambda line: line.split(","))
# Capturing the header
file1_header = file1_rdd.first()
file2_header = file2_rdd.first()
file3_header = file3_rdd.first()
# Removing the header from the table rows
df_file1 = file1_rdd.filter(lambda row : row != file1_header).toDF(file1_header)
df_file2 = file1_rdd.filter(lambda row : row != file2_header).toDF(file2_header)
df_file3 = file1_rdd.filter(lambda row : row != file3_header).toDF(file3_header)
# WORKS: df_file1.join(df_file2, df_file1.col1 == df_file2.col2)
# OUTPUT:
# -------file1.csv---------|---file2.csv--|
# |col1|col2|col3|col4|col5|col1|col2|col3|
# DOES NOT WORK: df_file1.join(df_file2, df_file1.col1 == df_file2.col2).join(df_file3, df_file2.col2 == df_file3.col2)
# OUTPUT:
# Caused by: java.lang.IllegalStateException: Input row doesn't have expected number of values required by the schema. 4 fields are required while 5 values are provided.
即使字段长度不同,但我可以加入前两个字段而不会出现错误,为什么会出现需要 4 个字段的错误?
问题是 file3.csv
包含未正确清理的数据。为了修复它,我只是像这样强制执行最大拆分:
file3_rdd = sc.textFile("file3.csv").map(lambda line: line.split(",", 3))
对于可能遇到过类似问题的任何阅读者:检查您是否可以首先独立查看表格而没有错误。执行 df_file3.show()
会返回相同的错误,并且会帮助我更快地看到问题。