Python Spark如何将一个rdd的字段映射到另一个rdd
Python Spark How to Map Fields of one rdd to another rdd
根据上述主题,我对 python spark 很陌生,我想将一个 Rdd 的字段映射到另一个 Rdd 的字段 Rdd.Here 就是示例
rdd1:
c_id name
121210 abc
121211 pqr
rdd2:
c_id cn_id cn_value
121211 0 0
121210 0 1
所以匹配的 c_id 将被 name 替换为 cnid 并且聚合 cn_value。所以输出会像这样 abc 0 0 pqr 0 1
from pyspark import SparkContext
import csv
sc = SparkContext("local", "spark-App")
file1 = sc.textFile('/home/hduser/sample.csv').map(lambda line:line.split(',')).filter(lambda line:len(line)>1)
file2 = sc.textFile('hdfs://localhost:9000/sample2/part-00000').map(lambda line:line.split(','))
file1_fields = file1.map(lambda x: (x[0],x[1]))
file2_fields = file2.map(lambda x: (x[0],x[1],float(x[2])))
如何通过在此处放置一些代码来实现我的目标。
任何帮助将不胜感激
谢谢
您正在查找的操作称为 join
。鉴于您的结构,最好使用 DataFrames
and spark-csv
(我假设第二个文件也是 comma-separated,但没有 header)。让我们从虚拟数据开始:
file1 = ... # path to the first file
file2 = ... # path to the second file
with open(file1, "w") as fw:
fw.write("c_id,name\n121210,abc\n121211,pqr")
with open(file2, "w") as fw:
fw.write("121211,0,0\n121210,0,1")
读取第一个文件:
df1 = (sqlContext.read
.format('com.databricks.spark.csv')
.options(header='true', inferSchema='true')
.load(file1))
加载第二个文件:
schema = StructType(
[StructField(x, LongType(), False) for x in ("c_id", "cn_id", "cn_value")])
df2 = (sqlContext.read
.format('com.databricks.spark.csv')
.schema(schema)
.options(header='false')
.load(file2))
终于加入:
combined = df1.join(df2, df1["c_id"] == df2["c_id"])
combined.show()
## +------+----+------+-----+--------+
## | c_id|name| c_id|cn_id|cn_value|
## +------+----+------+-----+--------+
## |121210| abc|121210| 0| 1|
## |121211| pqr|121211| 0| 0|
## +------+----+------+-----+--------+
编辑:
有了 RDD,你可以做这样的事情:
file1_fields.join(file2_fields.map(lambda x: (x[0], x[1:])))
根据上述主题,我对 python spark 很陌生,我想将一个 Rdd 的字段映射到另一个 Rdd 的字段 Rdd.Here 就是示例
rdd1:
c_id name
121210 abc
121211 pqr
rdd2:
c_id cn_id cn_value
121211 0 0
121210 0 1
所以匹配的 c_id 将被 name 替换为 cnid 并且聚合 cn_value。所以输出会像这样 abc 0 0 pqr 0 1
from pyspark import SparkContext
import csv
sc = SparkContext("local", "spark-App")
file1 = sc.textFile('/home/hduser/sample.csv').map(lambda line:line.split(',')).filter(lambda line:len(line)>1)
file2 = sc.textFile('hdfs://localhost:9000/sample2/part-00000').map(lambda line:line.split(','))
file1_fields = file1.map(lambda x: (x[0],x[1]))
file2_fields = file2.map(lambda x: (x[0],x[1],float(x[2])))
如何通过在此处放置一些代码来实现我的目标。
任何帮助将不胜感激 谢谢
您正在查找的操作称为 join
。鉴于您的结构,最好使用 DataFrames
and spark-csv
(我假设第二个文件也是 comma-separated,但没有 header)。让我们从虚拟数据开始:
file1 = ... # path to the first file
file2 = ... # path to the second file
with open(file1, "w") as fw:
fw.write("c_id,name\n121210,abc\n121211,pqr")
with open(file2, "w") as fw:
fw.write("121211,0,0\n121210,0,1")
读取第一个文件:
df1 = (sqlContext.read
.format('com.databricks.spark.csv')
.options(header='true', inferSchema='true')
.load(file1))
加载第二个文件:
schema = StructType(
[StructField(x, LongType(), False) for x in ("c_id", "cn_id", "cn_value")])
df2 = (sqlContext.read
.format('com.databricks.spark.csv')
.schema(schema)
.options(header='false')
.load(file2))
终于加入:
combined = df1.join(df2, df1["c_id"] == df2["c_id"])
combined.show()
## +------+----+------+-----+--------+
## | c_id|name| c_id|cn_id|cn_value|
## +------+----+------+-----+--------+
## |121210| abc|121210| 0| 1|
## |121211| pqr|121211| 0| 0|
## +------+----+------+-----+--------+
编辑:
有了 RDD,你可以做这样的事情:
file1_fields.join(file2_fields.map(lambda x: (x[0], x[1:])))