比较 pyspark 中的两个数据框并更改列值

Compare two dataframe in pyspark and change column value

我有两个像这样的 pyspark 数据框: df1:

|language|users_count|
+--------+-----------+
|    Java|      20000|
|  Python|     100000|
|   Scala|       3000|
+--------+-----------+

df2:

+------------+---+
|src_language|abb|
+------------+---+
|        Java|  J|
|      Python|  P|
|       Scala|  S|
+------------+---+

我想比较这两个数据帧并将df1中的列值替换为df2中的abb。所以输出将是:

|language|users_count|
+--------+-----------+
|    J   |      20000|
|    P   |     100000|
|    S   |       3000|
+--------+-----------+

我怎样才能做到这一点?

您可以使用 join - https://spark.apache.org/docs/latest/api/python/reference/api/pyspark.sql.DataFrame.join.html#pyspark.sql.DataFrame.join

轻松完成此操作

数据准备

df1 = pd.read_csv(StringIO("""language,users_count
                Java,20000
                Python,100000
                Scala,3000
        """)
    ,delimiter=','
)

    
df2 = pd.read_csv(StringIO("""src_language,abb
  Java,J
  Python,P
  Scala,S
  """)
    ,delimiter=','
)


sparkDF1 = sql.createDataFrame(df1)
sparkDF2 = sql.createDataFrame(df2)

sparkDF1.show(truncate=False)

+----------------------+-----------+
|language              |users_count|
+----------------------+-----------+
|                Java  |20000      |
|                Python|100000     |
|                Scala |3000       |
+----------------------+-----------+

sparkDF2.show()

+------------+---+
|src_language|abb|
+------------+---+
|        Java|  J|
|      Python|  P|
|       Scala|  S|
+------------+---+

加入

finalDF = sparkDF1.join(sparkDF2
                       ,F.trim(sparkDF1['language']) == F.trim(sparkDF2['src_language'])
                       ,'inner'
          ).select(sparkDF2['abb'].alias('language')
                   ,sparkDF1['users_count']
          )

finalDF.show(truncate=False)

+--------+-----------+
|language|users_count|
+--------+-----------+
|S       |3000       |
|P       |100000     |
|J       |20000      |
+--------+-----------+

您可以简单地连接两个数据框,然后简单地重命名列名以获得所需的输出。

#Sample Data :
 
columns = ['language','users_count']
data = [("Java","20000"), ("Python","100000"), ("Scala","3000")]
rdd = spark.sparkContext.parallelize(data)
df = rdd.toDF(columns)

columns1 = ['src_language','abb']
data1 = [("Java","J"), ("Python","P"), ("Scala","S")]
rdd1 = spark.sparkContext.parallelize(data1)
df1 = rdd1.toDF(columns1)

#Joining dataframes and doing required transformation

df2 = df.join(df1, df.language == df1.src_language,"inner").select("abb","users_count").withColumnRenamed("abb","language")

在数据帧上执行显示或显示后,您可以看到如下输出: