从 csv 文件向现有的 apache spark 数据框添加数据

Adding data to an existing apache spark dataframe from a csv file

我有一个包含两列的 spark 数据框:姓名、年龄如下:

[Row(name=u'Alice', age=2), Row(name=u'Bob', age=5)]

数据框是使用

创建的
sqlContext.createDataFrame()

我接下来需要做的是从外部 'csv' 文件添加第三列 'UserId'。外部文件有几列,但我只需要包含第一列,即 'UserId':

两个数据源中的记录数相同。我在 windows os 上使用独立的 pyspark 版本。最终结果应该是一个包含三列的新数据框:UserId、Name、Age。

有什么建议吗?

您可以通过连接两个数据框来完成此操作,但为此您需要在 booth 表中包含 ID 或其他键。如果行的位置相同,我建议将它复制到 excel 文件,否则你没有足够的信息来合并它们。

我用 pandas 完成了这项工作。它允许以多种不同的方式加入数据框。

1) 我们首先只需要导入那个额外的列(在我们删除 headers 之后,尽管这也可以在导入之后完成)并将其转换为 RDD

from pyspark.sql.types import StringType
from pyspark import SQLContext
sqlContext = SQLContext(sc)
userid_rdd = sc.textFile("C:……/userid.csv").map(lambda line: line.split(","))

2) 将 'userid' RDD 转换为 spark 数据帧

userid_df = userid_rdd.toDF(['userid'])
userid_df.show()

3) 将 'userid' 数据帧转换为 pandas 数据帧

userid_toPandas = userid_df.toPandas()
userid_toPandas

4) 将“预测”数据框(现有数据框)转换为 pandas 数据框

predictions_toPandas = predictions.toPandas() 
predictions_toPandas

5) 使用‘concat’

将两个pandas数据帧合并为一个新的数据帧
import pandas as pd
result = pd.concat([userid_toPandas, predictions_toPandas], axis = 1, ignore_index = True)
result

您可以从 csv 创建一个新的数据框。

    sc = SparkContext.getOrCreate()
    sqlContext = SQLContext(sc)

    # Import the csv file to the SparkSQL table.

    df = sqlContext.read.csv("abc.csv")
    df.createOrReplaceTempView(table_a)

    # Create a new dataframe with only the columns required. In your case only user id
     df_1 = spark.sql("select userid from table_a")

    #Now do a join with the existing dataframe which has the original data. ( [Row(name=u'Alice', age=2), Row(name=u'Bob', age=5)] )
    # Lets call the original alice-bob dataframe as df_ori. So,

    df_result = df_ori.join(df_1, how=inner, on= (any column cols if there are any or index row)