如何将 rdd 数据插入 pyspark 中的数据框?
How to insert the rdd data into a dataframe in pyspark?
请在下面找到伪代码:
5 列的源数据框
正在使用架构(6 列)创建目标数据框
对于 source_dataframe 中的项目:
#adding a column to the list 购买检查 item.coulmn2
list = [item.column1,item.column2,newcolumn]
#从这个列表中创建一个rdd
#now 我需要将这个 rdd 添加到目标数据框?????
您一定可以更详细地解释您的问题或提供一些示例代码。我很感兴趣其他人将如何解决这个问题。我建议的解决方案是这个:
df = (
sc.parallelize([
(134, "2016-07-02 12:01:40"),
(134, "2016-07-02 12:21:23"),
(125, "2016-07-02 13:22:56"),
(125, "2016-07-02 13:27:07")
]).toDF(["itemid", "timestamp"])
)
rdd = df.map(lambda x: (x[0], x[1], 10))
df2 = rdd.toDF(["itemid", "timestamp", "newCol"])
df3 = df.join(df2, df.itemid == df2.itemid and df.timestamp == df2.timestamp, "inner").drop(df2.itemid).drop(df2.timestamp)
我正在将 RDD 转换为 Dataframe。之后我加入了两个 Dataframes,它复制了一些列。所以最后我删除了那些重复的列。
请在下面找到伪代码:
5 列的源数据框
正在使用架构(6 列)创建目标数据框
对于 source_dataframe 中的项目: #adding a column to the list 购买检查 item.coulmn2 list = [item.column1,item.column2,newcolumn] #从这个列表中创建一个rdd #now 我需要将这个 rdd 添加到目标数据框?????
您一定可以更详细地解释您的问题或提供一些示例代码。我很感兴趣其他人将如何解决这个问题。我建议的解决方案是这个:
df = (
sc.parallelize([
(134, "2016-07-02 12:01:40"),
(134, "2016-07-02 12:21:23"),
(125, "2016-07-02 13:22:56"),
(125, "2016-07-02 13:27:07")
]).toDF(["itemid", "timestamp"])
)
rdd = df.map(lambda x: (x[0], x[1], 10))
df2 = rdd.toDF(["itemid", "timestamp", "newCol"])
df3 = df.join(df2, df.itemid == df2.itemid and df.timestamp == df2.timestamp, "inner").drop(df2.itemid).drop(df2.timestamp)
我正在将 RDD 转换为 Dataframe。之后我加入了两个 Dataframes,它复制了一些列。所以最后我删除了那些重复的列。