Spark Dataframe 区分具有重复名称的列
Spark Dataframe distinguish columns with duplicated name
据我所知,在 Spark Dataframe 中,多个列可以具有相同的名称,如下面的数据帧快照所示:
[
Row(a=107831, f=SparseVector(5, {0: 0.0, 1: 0.0, 2: 0.0, 3: 0.0, 4: 0.0}), a=107831, f=SparseVector(5, {0: 0.0, 1: 0.0, 2: 0.0, 3: 0.0, 4: 0.0})),
Row(a=107831, f=SparseVector(5, {0: 0.0, 1: 0.0, 2: 0.0, 3: 0.0, 4: 0.0}), a=125231, f=SparseVector(5, {0: 0.0, 1: 0.0, 2: 0.0047, 3: 0.0, 4: 0.0043})),
Row(a=107831, f=SparseVector(5, {0: 0.0, 1: 0.0, 2: 0.0, 3: 0.0, 4: 0.0}), a=145831, f=SparseVector(5, {0: 0.0, 1: 0.2356, 2: 0.0036, 3: 0.0, 4: 0.4132})),
Row(a=107831, f=SparseVector(5, {0: 0.0, 1: 0.0, 2: 0.0, 3: 0.0, 4: 0.0}), a=147031, f=SparseVector(5, {0: 0.0, 1: 0.0, 2: 0.0, 3: 0.0, 4: 0.0})),
Row(a=107831, f=SparseVector(5, {0: 0.0, 1: 0.0, 2: 0.0, 3: 0.0, 4: 0.0}), a=149231, f=SparseVector(5, {0: 0.0, 1: 0.0032, 2: 0.2451, 3: 0.0, 4: 0.0042}))
]
以上结果是通过将数据框连接到自身创建的,您可以看到 4
列包含两个 a
和 f
。
问题是,当我尝试对 a
列进行更多计算时,我无法找到 select 和 a
的方法,我尝试了 [=17] =] 和 df.select('a')
,都在错误消息下方返回给我:
AnalysisException: Reference 'a' is ambiguous, could be: a#1333L, a#1335L.
在 Spark API 中我是否可以再次区分列和重复名称?或者也许可以通过某种方式让我更改列名?
深入研究 Spark API 后,我发现我可以先使用 alias
为原始数据框创建别名,然后使用 withColumnRenamed
手动重命名别名,这将执行 join
而不会导致列名重复。
更多细节可以参考下面Spark Dataframe API:
pyspark.sql.DataFrame.withColumnRenamed
但是,我认为这只是一个麻烦的解决方法,想知道是否有更好的方法来解决我的问题。
我建议您更改 join
的列名称。
df1.select(col("a") as "df1_a", col("f") as "df1_f")
.join(df2.select(col("a") as "df2_a", col("f") as "df2_f"), col("df1_a" === col("df2_a"))
结果 DataFrame
将有 schema
(df1_a, df1_f, df2_a, df2_f)
让我们从一些数据开始:
from pyspark.mllib.linalg import SparseVector
from pyspark.sql import Row
df1 = sqlContext.createDataFrame([
Row(a=107831, f=SparseVector(
5, {0: 0.0, 1: 0.0, 2: 0.0, 3: 0.0, 4: 0.0})),
Row(a=125231, f=SparseVector(
5, {0: 0.0, 1: 0.0, 2: 0.0047, 3: 0.0, 4: 0.0043})),
])
df2 = sqlContext.createDataFrame([
Row(a=107831, f=SparseVector(
5, {0: 0.0, 1: 0.0, 2: 0.0, 3: 0.0, 4: 0.0})),
Row(a=107831, f=SparseVector(
5, {0: 0.0, 1: 0.0, 2: 0.0, 3: 0.0, 4: 0.0})),
])
有几种方法可以解决这个问题。首先,您可以使用父列明确引用子 table 列:
df1.join(df2, df1['a'] == df2['a']).select(df1['f']).show(2)
## +--------------------+
## | f|
## +--------------------+
## |(5,[0,1,2,3,4],[0...|
## |(5,[0,1,2,3,4],[0...|
## +--------------------+
您也可以使用 table 别名:
from pyspark.sql.functions import col
df1_a = df1.alias("df1_a")
df2_a = df2.alias("df2_a")
df1_a.join(df2_a, col('df1_a.a') == col('df2_a.a')).select('df1_a.f').show(2)
## +--------------------+
## | f|
## +--------------------+
## |(5,[0,1,2,3,4],[0...|
## |(5,[0,1,2,3,4],[0...|
## +--------------------+
您终于可以通过编程方式重命名列了:
df1_r = df1.select(*(col(x).alias(x + '_df1') for x in df1.columns))
df2_r = df2.select(*(col(x).alias(x + '_df2') for x in df2.columns))
df1_r.join(df2_r, col('a_df1') == col('a_df2')).select(col('f_df1')).show(2)
## +--------------------+
## | f_df1|
## +--------------------+
## |(5,[0,1,2,3,4],[0...|
## |(5,[0,1,2,3,4],[0...|
## +--------------------+
您可以使用def drop(col: Column)
方法删除重复的列,例如:
DataFrame:df1
+-------+-----+
| a | f |
+-------+-----+
|107831 | ... |
|107831 | ... |
+-------+-----+
DataFrame:df2
+-------+-----+
| a | f |
+-------+-----+
|107831 | ... |
|107831 | ... |
+-------+-----+
当我加入 df1 和 df2 时,DataFrame 将如下所示:
val newDf = df1.join(df2,df1("a")===df2("a"))
DataFrame:newDf
+-------+-----+-------+-----+
| a | f | a | f |
+-------+-----+-------+-----+
|107831 | ... |107831 | ... |
|107831 | ... |107831 | ... |
+-------+-----+-------+-----+
现在,我们可以使用def drop(col: Column)
方法删除重复的列'a'或'f',如下所示:
val newDfWithoutDuplicate = df1.join(df2,df1("a")===df2("a")).drop(df2("a")).drop(df2("f"))
假设您要连接的 DataFrame 是 df1 和 df2,并且您在列 'a' 上连接它们,那么您有 2 种方法
方法一
df1.join(df2,'a','left_outer')
这是一个很棒的方法,强烈推荐。
方法二
df1.join(df2,df1.a == df2.a,'left_outer').drop(df2.a)
有一种比为您要加入的所有列编写别名更简单的方法:
df1.join(df2,['a'])
如果您要加入的键在两个表中相同,则此方法有效。
见
https://kb.databricks.com/data/join-two-dataframes-duplicated-columns.html
这就是我们如何在 PySpark 中在相同的列名上连接两个数据帧。
df = df1.join(df2, ['col1','col2','col3'])
如果您在此之后执行 printSchema()
,那么您会看到重复的列已被删除。
This might not be the best approach, but if you want to rename the duplicate columns(after join), you can do so using this tiny function.
def rename_duplicate_columns(dataframe):
columns = dataframe.columns
duplicate_column_indices = list(set([columns.index(col) for col in columns if columns.count(col) == 2]))
for index in duplicate_column_indices:
columns[index] = columns[index]+'2'
dataframe = dataframe.toDF(*columns)
return dataframe
如果您的用例比 Glennie Helles Sindholt 的回答中描述的更复杂,例如您有 other/few 个相同的非连接列名称,并希望在选择最好使用别名时区分它们,例如:
df3 = df1.select("a", "b").alias("left")\
.join(df2.select("a", "b").alias("right"), ["a"])\
.select("left.a", "left.b", "right.b")
df3.columns
['a', 'b', 'b']
如果两个表中只有键列相同,则尝试使用以下方式(方法 1):
left. join(right , 'key', 'inner')
而不是下面(方法 2):
left. join(right , left.key == right.key, 'inner')
使用方法 1 的优点:
- 'key' 将在最终数据帧中仅显示一次
- 易于使用的语法
使用方法 1 的缺点:
- 只帮助关键列
- 场景,其中left join的情况下,如果打算使用right key null count,这个就不行了。在那种情况下,必须如上所述重命名其中一个键。
什么对我有用
import databricks.koalas as ks
df1k = df1.to_koalas()
df2k = df2.to_koalas()
df3k = df1k.merge(df2k, on=['col1', 'col2'])
df3 = df3k.to_spark()
除了 col1 和 col2 之外的所有列,如果它们来自 df1,则在其名称后附加“_x”,如果它们来自 df2,则附加“_y”,这正是我所需要的。
Pyspark 3.2.1 +
我找到了在 Spark 3.2.1 中使用 toDF
执行此操作的简单方法
df.show()
+------+------+---------+
|number| word| word|
+------+------+---------+
| 1| apple| banana|
| 2|cherry| pear|
| 3| grape|pineapple|
+------+------+---------+
df = df.toDF(*[val + str(i) for i, val in enumerate(df.columns)])
df.show()
+-------+------+---------+
|number0| word1| word2|
+-------+------+---------+
| 1| apple| banana|
| 2|cherry| pear|
| 3| grape|pineapple|
+-------+------+---------+
据我所知,在 Spark Dataframe 中,多个列可以具有相同的名称,如下面的数据帧快照所示:
[
Row(a=107831, f=SparseVector(5, {0: 0.0, 1: 0.0, 2: 0.0, 3: 0.0, 4: 0.0}), a=107831, f=SparseVector(5, {0: 0.0, 1: 0.0, 2: 0.0, 3: 0.0, 4: 0.0})),
Row(a=107831, f=SparseVector(5, {0: 0.0, 1: 0.0, 2: 0.0, 3: 0.0, 4: 0.0}), a=125231, f=SparseVector(5, {0: 0.0, 1: 0.0, 2: 0.0047, 3: 0.0, 4: 0.0043})),
Row(a=107831, f=SparseVector(5, {0: 0.0, 1: 0.0, 2: 0.0, 3: 0.0, 4: 0.0}), a=145831, f=SparseVector(5, {0: 0.0, 1: 0.2356, 2: 0.0036, 3: 0.0, 4: 0.4132})),
Row(a=107831, f=SparseVector(5, {0: 0.0, 1: 0.0, 2: 0.0, 3: 0.0, 4: 0.0}), a=147031, f=SparseVector(5, {0: 0.0, 1: 0.0, 2: 0.0, 3: 0.0, 4: 0.0})),
Row(a=107831, f=SparseVector(5, {0: 0.0, 1: 0.0, 2: 0.0, 3: 0.0, 4: 0.0}), a=149231, f=SparseVector(5, {0: 0.0, 1: 0.0032, 2: 0.2451, 3: 0.0, 4: 0.0042}))
]
以上结果是通过将数据框连接到自身创建的,您可以看到 4
列包含两个 a
和 f
。
问题是,当我尝试对 a
列进行更多计算时,我无法找到 select 和 a
的方法,我尝试了 [=17] =] 和 df.select('a')
,都在错误消息下方返回给我:
AnalysisException: Reference 'a' is ambiguous, could be: a#1333L, a#1335L.
在 Spark API 中我是否可以再次区分列和重复名称?或者也许可以通过某种方式让我更改列名?
深入研究 Spark API 后,我发现我可以先使用 alias
为原始数据框创建别名,然后使用 withColumnRenamed
手动重命名别名,这将执行 join
而不会导致列名重复。
更多细节可以参考下面Spark Dataframe API:
pyspark.sql.DataFrame.withColumnRenamed
但是,我认为这只是一个麻烦的解决方法,想知道是否有更好的方法来解决我的问题。
我建议您更改 join
的列名称。
df1.select(col("a") as "df1_a", col("f") as "df1_f")
.join(df2.select(col("a") as "df2_a", col("f") as "df2_f"), col("df1_a" === col("df2_a"))
结果 DataFrame
将有 schema
(df1_a, df1_f, df2_a, df2_f)
让我们从一些数据开始:
from pyspark.mllib.linalg import SparseVector
from pyspark.sql import Row
df1 = sqlContext.createDataFrame([
Row(a=107831, f=SparseVector(
5, {0: 0.0, 1: 0.0, 2: 0.0, 3: 0.0, 4: 0.0})),
Row(a=125231, f=SparseVector(
5, {0: 0.0, 1: 0.0, 2: 0.0047, 3: 0.0, 4: 0.0043})),
])
df2 = sqlContext.createDataFrame([
Row(a=107831, f=SparseVector(
5, {0: 0.0, 1: 0.0, 2: 0.0, 3: 0.0, 4: 0.0})),
Row(a=107831, f=SparseVector(
5, {0: 0.0, 1: 0.0, 2: 0.0, 3: 0.0, 4: 0.0})),
])
有几种方法可以解决这个问题。首先,您可以使用父列明确引用子 table 列:
df1.join(df2, df1['a'] == df2['a']).select(df1['f']).show(2)
## +--------------------+
## | f|
## +--------------------+
## |(5,[0,1,2,3,4],[0...|
## |(5,[0,1,2,3,4],[0...|
## +--------------------+
您也可以使用 table 别名:
from pyspark.sql.functions import col
df1_a = df1.alias("df1_a")
df2_a = df2.alias("df2_a")
df1_a.join(df2_a, col('df1_a.a') == col('df2_a.a')).select('df1_a.f').show(2)
## +--------------------+
## | f|
## +--------------------+
## |(5,[0,1,2,3,4],[0...|
## |(5,[0,1,2,3,4],[0...|
## +--------------------+
您终于可以通过编程方式重命名列了:
df1_r = df1.select(*(col(x).alias(x + '_df1') for x in df1.columns))
df2_r = df2.select(*(col(x).alias(x + '_df2') for x in df2.columns))
df1_r.join(df2_r, col('a_df1') == col('a_df2')).select(col('f_df1')).show(2)
## +--------------------+
## | f_df1|
## +--------------------+
## |(5,[0,1,2,3,4],[0...|
## |(5,[0,1,2,3,4],[0...|
## +--------------------+
您可以使用def drop(col: Column)
方法删除重复的列,例如:
DataFrame:df1
+-------+-----+
| a | f |
+-------+-----+
|107831 | ... |
|107831 | ... |
+-------+-----+
DataFrame:df2
+-------+-----+
| a | f |
+-------+-----+
|107831 | ... |
|107831 | ... |
+-------+-----+
当我加入 df1 和 df2 时,DataFrame 将如下所示:
val newDf = df1.join(df2,df1("a")===df2("a"))
DataFrame:newDf
+-------+-----+-------+-----+
| a | f | a | f |
+-------+-----+-------+-----+
|107831 | ... |107831 | ... |
|107831 | ... |107831 | ... |
+-------+-----+-------+-----+
现在,我们可以使用def drop(col: Column)
方法删除重复的列'a'或'f',如下所示:
val newDfWithoutDuplicate = df1.join(df2,df1("a")===df2("a")).drop(df2("a")).drop(df2("f"))
假设您要连接的 DataFrame 是 df1 和 df2,并且您在列 'a' 上连接它们,那么您有 2 种方法
方法一
df1.join(df2,'a','left_outer')
这是一个很棒的方法,强烈推荐。
方法二
df1.join(df2,df1.a == df2.a,'left_outer').drop(df2.a)
有一种比为您要加入的所有列编写别名更简单的方法:
df1.join(df2,['a'])
如果您要加入的键在两个表中相同,则此方法有效。
见 https://kb.databricks.com/data/join-two-dataframes-duplicated-columns.html
这就是我们如何在 PySpark 中在相同的列名上连接两个数据帧。
df = df1.join(df2, ['col1','col2','col3'])
如果您在此之后执行 printSchema()
,那么您会看到重复的列已被删除。
This might not be the best approach, but if you want to rename the duplicate columns(after join), you can do so using this tiny function.
def rename_duplicate_columns(dataframe):
columns = dataframe.columns
duplicate_column_indices = list(set([columns.index(col) for col in columns if columns.count(col) == 2]))
for index in duplicate_column_indices:
columns[index] = columns[index]+'2'
dataframe = dataframe.toDF(*columns)
return dataframe
如果您的用例比 Glennie Helles Sindholt 的回答中描述的更复杂,例如您有 other/few 个相同的非连接列名称,并希望在选择最好使用别名时区分它们,例如:
df3 = df1.select("a", "b").alias("left")\
.join(df2.select("a", "b").alias("right"), ["a"])\
.select("left.a", "left.b", "right.b")
df3.columns
['a', 'b', 'b']
如果两个表中只有键列相同,则尝试使用以下方式(方法 1):
left. join(right , 'key', 'inner')
而不是下面(方法 2):
left. join(right , left.key == right.key, 'inner')
使用方法 1 的优点:
- 'key' 将在最终数据帧中仅显示一次
- 易于使用的语法
使用方法 1 的缺点:
- 只帮助关键列
- 场景,其中left join的情况下,如果打算使用right key null count,这个就不行了。在那种情况下,必须如上所述重命名其中一个键。
什么对我有用
import databricks.koalas as ks
df1k = df1.to_koalas()
df2k = df2.to_koalas()
df3k = df1k.merge(df2k, on=['col1', 'col2'])
df3 = df3k.to_spark()
除了 col1 和 col2 之外的所有列,如果它们来自 df1,则在其名称后附加“_x”,如果它们来自 df2,则附加“_y”,这正是我所需要的。
Pyspark 3.2.1 +
我找到了在 Spark 3.2.1 中使用 toDF
df.show()
+------+------+---------+
|number| word| word|
+------+------+---------+
| 1| apple| banana|
| 2|cherry| pear|
| 3| grape|pineapple|
+------+------+---------+
df = df.toDF(*[val + str(i) for i, val in enumerate(df.columns)])
df.show()
+-------+------+---------+
|number0| word1| word2|
+-------+------+---------+
| 1| apple| banana|
| 2|cherry| pear|
| 3| grape|pineapple|
+-------+------+---------+