如何在 Pyspark 中按列 concatenate/append 多个 Spark 数据帧?
How to concatenate/append multiple Spark dataframes column wise in Pyspark?
如何使用 Pyspark 数据帧 pandas 相当于 pd.concat([df1,df2],axis='columns')?
我用谷歌搜索并没有找到好的解决方案。
DF1
var1
3
4
5
DF2
var2 var3
23 31
44 45
52 53
Expected output dataframe
var1 var2 var3
3 23 31
4 44 45
5 52 53
已编辑以包含预期输出
以下是您想在 scala 中执行的示例,希望您可以将其转换为 pyspark
val spark = SparkSession
.builder()
.master("local")
.appName("ParquetAppendMode")
.getOrCreate()
import spark.implicits._
val df1 = spark.sparkContext.parallelize(Seq(
(1, "abc"),
(2, "def"),
(3, "hij")
)).toDF("id", "name")
val df2 = spark.sparkContext.parallelize(Seq(
(19, "x"),
(29, "y"),
(39, "z")
)).toDF("age", "address")
val schema = StructType(df1.schema.fields ++ df2.schema.fields)
val df1df2 = df1.rdd.zip(df2.rdd).map{
case (rowLeft, rowRight) => Row.fromSeq(rowLeft.toSeq ++ rowRight.toSeq)}
spark.createDataFrame(df1df2, schema).show()
这就是你只使用数据框的方式
import org.apache.spark.sql.functions._
val ddf1 = df1.withColumn("row_id", monotonically_increasing_id())
val ddf2 = df2.withColumn("row_id", monotonically_increasing_id())
val result = ddf1.join(ddf2, Seq("row_id")).drop("row_id")
result.show()
添加新列作为 row_id
并使用键作为 row_id
连接两个数据框。
希望对您有所帮助!
在这里,我使用@Shankar Koirala 的答案在 Pyspark 中按列合并 2 个数据帧(不加入)所做的工作
+---+-----+ +-----+----+ +---+-----+-----+----+
| id| name| |secNo|city| | id| name|secNo|city|
+---+-----+ +-----+----+ +---+-----+-----+----+
| 1|sammy| + | 101| LA| => | 1|sammy| 101| LA|
| 2| jill| | 102| CA| | 2| jill| 102| CA|
| 3| john| | 103| DC| | 3| john| 103| DC|
+---+-----+ +-----+----+ +---+-----+-----+----+
这是我的 Pyspark 代码
df1_schema = StructType([StructField("id",IntegerType()),StructField("name",StringType())])
df1 = spark.sparkContext.parallelize([(1, "sammy"),(2, "jill"),(3, "john")])
df1 = spark.createDataFrame(df1, schema=df1_schema)
df2_schema = StructType([StructField("secNo",IntegerType()),StructField("city",StringType())])
df2 = spark.sparkContext.parallelize([(101, "LA"),(102, "CA"),(103,"DC")])
df2 = spark.createDataFrame(df2, schema=df2_schema)
df3_schema = StructType(df1.schema.fields + df2.schema.fields)
def myFunc(x):
dt1 = x[0]
dt2 = x[1]
id = dt1[0]
name = dt1[1]
secNo = dt2[0]
city = dt2[1]
return [id,name,secNo,city]
rdd_merged = df1.rdd.zip(df2.rdd).map(lambda x: myFunc(x))
df3 = spark.createDataFrame(rdd_merged, schema=df3_schema)
请注意,2 个表的行数应该相同。谢谢"Shankar Koirala"
使用 pyspark
接受的答案等同于
from pyspark.sql.types import StructType
spark = SparkSession.builder().master("local").getOrCreate()
df1 = spark.sparkContext.parallelize([(1, "a"),(2, "b"),(3, "c")]).toDF(["id", "name"])
df2 = spark.sparkContext.parallelize([(7, "x"),(8, "y"),(9, "z")]).toDF(["age", "address"])
schema = StructType(df1.schema.fields + df2.schema.fields)
df1df2 = df1.rdd.zip(df2.rdd).map(lambda x: x[0]+x[1])
spark.createDataFrame(df1df2, schema).show()
我花了几个小时用 PySpark 来做这个,我的一个工作解决方案如下; (顺便说一下,Python 相当于@Shankar Koirala 的回答)
from pyspark.sql.functions import monotonically_increasing_id
DF1 = df2.withColumn("row_id", monotonically_increasing_id())
DF2 = df3.withColumn("row_id", monotonically_increasing_id())
result_df = DF1.join(DF2, ("row_id")).drop("row_id")
您只是为两个数据框定义一个公共列,并在合并后立即删除该列。我希望此解决方案在数据框不包含任何公共列的情况下有所帮助。
但是,此方法随机连接数据帧行,请记住一个细节。
如何使用 Pyspark 数据帧 pandas 相当于 pd.concat([df1,df2],axis='columns')? 我用谷歌搜索并没有找到好的解决方案。
DF1
var1
3
4
5
DF2
var2 var3
23 31
44 45
52 53
Expected output dataframe
var1 var2 var3
3 23 31
4 44 45
5 52 53
已编辑以包含预期输出
以下是您想在 scala 中执行的示例,希望您可以将其转换为 pyspark
val spark = SparkSession
.builder()
.master("local")
.appName("ParquetAppendMode")
.getOrCreate()
import spark.implicits._
val df1 = spark.sparkContext.parallelize(Seq(
(1, "abc"),
(2, "def"),
(3, "hij")
)).toDF("id", "name")
val df2 = spark.sparkContext.parallelize(Seq(
(19, "x"),
(29, "y"),
(39, "z")
)).toDF("age", "address")
val schema = StructType(df1.schema.fields ++ df2.schema.fields)
val df1df2 = df1.rdd.zip(df2.rdd).map{
case (rowLeft, rowRight) => Row.fromSeq(rowLeft.toSeq ++ rowRight.toSeq)}
spark.createDataFrame(df1df2, schema).show()
这就是你只使用数据框的方式
import org.apache.spark.sql.functions._
val ddf1 = df1.withColumn("row_id", monotonically_increasing_id())
val ddf2 = df2.withColumn("row_id", monotonically_increasing_id())
val result = ddf1.join(ddf2, Seq("row_id")).drop("row_id")
result.show()
添加新列作为 row_id
并使用键作为 row_id
连接两个数据框。
希望对您有所帮助!
在这里,我使用@Shankar Koirala 的答案在 Pyspark 中按列合并 2 个数据帧(不加入)所做的工作
+---+-----+ +-----+----+ +---+-----+-----+----+
| id| name| |secNo|city| | id| name|secNo|city|
+---+-----+ +-----+----+ +---+-----+-----+----+
| 1|sammy| + | 101| LA| => | 1|sammy| 101| LA|
| 2| jill| | 102| CA| | 2| jill| 102| CA|
| 3| john| | 103| DC| | 3| john| 103| DC|
+---+-----+ +-----+----+ +---+-----+-----+----+
这是我的 Pyspark 代码
df1_schema = StructType([StructField("id",IntegerType()),StructField("name",StringType())])
df1 = spark.sparkContext.parallelize([(1, "sammy"),(2, "jill"),(3, "john")])
df1 = spark.createDataFrame(df1, schema=df1_schema)
df2_schema = StructType([StructField("secNo",IntegerType()),StructField("city",StringType())])
df2 = spark.sparkContext.parallelize([(101, "LA"),(102, "CA"),(103,"DC")])
df2 = spark.createDataFrame(df2, schema=df2_schema)
df3_schema = StructType(df1.schema.fields + df2.schema.fields)
def myFunc(x):
dt1 = x[0]
dt2 = x[1]
id = dt1[0]
name = dt1[1]
secNo = dt2[0]
city = dt2[1]
return [id,name,secNo,city]
rdd_merged = df1.rdd.zip(df2.rdd).map(lambda x: myFunc(x))
df3 = spark.createDataFrame(rdd_merged, schema=df3_schema)
请注意,2 个表的行数应该相同。谢谢"Shankar Koirala"
使用 pyspark
接受的答案等同于
from pyspark.sql.types import StructType
spark = SparkSession.builder().master("local").getOrCreate()
df1 = spark.sparkContext.parallelize([(1, "a"),(2, "b"),(3, "c")]).toDF(["id", "name"])
df2 = spark.sparkContext.parallelize([(7, "x"),(8, "y"),(9, "z")]).toDF(["age", "address"])
schema = StructType(df1.schema.fields + df2.schema.fields)
df1df2 = df1.rdd.zip(df2.rdd).map(lambda x: x[0]+x[1])
spark.createDataFrame(df1df2, schema).show()
我花了几个小时用 PySpark 来做这个,我的一个工作解决方案如下; (顺便说一下,Python 相当于@Shankar Koirala 的回答)
from pyspark.sql.functions import monotonically_increasing_id
DF1 = df2.withColumn("row_id", monotonically_increasing_id())
DF2 = df3.withColumn("row_id", monotonically_increasing_id())
result_df = DF1.join(DF2, ("row_id")).drop("row_id")
您只是为两个数据框定义一个公共列,并在合并后立即删除该列。我希望此解决方案在数据框不包含任何公共列的情况下有所帮助。
但是,此方法随机连接数据帧行,请记住一个细节。