使用 Python Spark 代码使用连接聚合两个文件的结果

Aggregate the result from two files using Python Spark code using joins

大家好,我是 python 和 spark 的新手,所以我需要大家的帮助。 我有两个文件,我使用 SparkContext

加载了它们
file1 = sc.textFile("hdfs://localhost:9000/data/out1/part-00000")
file2 = sc.textFile("hdfs://localhost:9000/data/out2/part-00000")

文件 1 包含以下数据。

c_id   date         TM        cn_id      c_val  tc
10201   2015-4-15  00:00:00  56707065  0         0
10201   2015-4-15  00:00:00  56707066  1         0
10201   2015-4-15  00:00:00  56707067  200       0

同样有多个 c_id,对于每个 c_id,cn_id 不同,文件 1 中的 c_value 不同,tc 是固定的,即 0.

文件 2 包含以下数据。

c_id   dt          tm        cn_id      c_val  tc
10201   2015-4-15  01:00:00  56707065  300      1
10201   2015-4-15  01:00:00  56707066  60        1
10201   2015-4-15  01:00:00  56707067  20        1

这里 tc 是固定的,即 1

文件一和文件二中的所有值都相同,只有 c_val 根据 cn_id 更改,所以我想要第三个文件包含 c_val 的总和,即对于 c_id 10201 & 对于 cn_id 56707065 我想要这样的结果 10201 2015-4-15 01:00:00 56707065 0+300 =300 所以最后第三个文件中的输出将是,

10201   2015-4-15  01:00:00  56707065 300 1

类似地cn_id 56707066,56707067聚合结果并将其放入第三个文件。请向我推荐 python spark 片段。

我希望在 python spark 中加入这个结果,或者如果在 spark 中使用 pyhton 有任何其他技术。

嗯,我猜你可以加入 cn_id,然后加起来 c_val

你有 2 种简单的方法,一种是用索引压缩每个文件(在 scala-spark 中,我使用 zipWithIndex,并合并结果,groupBy 压缩 id,并通过添加 c_val 列来减少。

你的第二个解决方案是在 scala 中压缩和添加(自动保留索引)我的代码看起来像:

(filea,fileb).zipped.map((x,y)=> ((x.c_val+y.c_val)))

解决方案 3,看来您的 cn_id 也可用于索引正确的对。你可以做: filea.union(fileb).groupBy(_.cn_id) 并通过添加

减少 c_val

我合并了两个文件并制作了单独的文件。它也包含一些浮点数 c_val,我有以下代码

from pyspark import SparkContext
sc = SparkContext("local", "Aggregate")
file1 = sc.textFile("hdfs://localhost:9000/data/parse/combine/joined.txt")
file2 = file1.flatMap(lambda line: line.split(','))\
             .map(lambda x: (x[0] + ',' + x[3], float(x[4])))\
             .reduceByKey(lambda a,b:a+b).coalesce(1)

final_agg = file2.map(lambda x: (x[0]+','+ x[3], float(x[4])))\
                 .reduceByKey(lambda a,b:a+b)\
                 .coalesce(1)
print(file2.collect())

我收到以下错误

ValueError: could not convert string to float:

根据上面的代码,预期的输出是:

[( '10201,56707065',300), ('10201,56707066',61)] 

并且输入文件包含如下数据:

10201,  '2015-4-15',  '00:00:00',  56707065,    '0',    0
10201   '2015-4-15',  '00:00:00',  56707066,    '1',    0
10201   '2015-4-15',  '00:00:00',  56707067,    '200',  0
10201,  '2015-4-15',  '00:30:00',  56707065,    '300',  1
10201   '2015-4-15',  '00:30:00',  56707066,     '60',  1
10201   '2015-4-15',  '00:30:00',  56707067,     '20',  1

任何帮助将不胜感激。 谢谢