使用 Python Spark 代码使用连接聚合两个文件的结果
Aggregate the result from two files using Python Spark code using joins
大家好,我是 python 和 spark 的新手,所以我需要大家的帮助。
我有两个文件,我使用 SparkContext
加载了它们
file1 = sc.textFile("hdfs://localhost:9000/data/out1/part-00000")
file2 = sc.textFile("hdfs://localhost:9000/data/out2/part-00000")
文件 1 包含以下数据。
c_id date TM cn_id c_val tc
10201 2015-4-15 00:00:00 56707065 0 0
10201 2015-4-15 00:00:00 56707066 1 0
10201 2015-4-15 00:00:00 56707067 200 0
同样有多个 c_id,对于每个 c_id,cn_id 不同,文件 1 中的 c_value 不同,tc 是固定的,即 0.
文件 2 包含以下数据。
c_id dt tm cn_id c_val tc
10201 2015-4-15 01:00:00 56707065 300 1
10201 2015-4-15 01:00:00 56707066 60 1
10201 2015-4-15 01:00:00 56707067 20 1
这里 tc 是固定的,即 1
文件一和文件二中的所有值都相同,只有 c_val 根据 cn_id 更改,所以我想要第三个文件包含 c_val 的总和,即对于 c_id 10201
& 对于 cn_id 56707065
我想要这样的结果 10201 2015-4-15 01:00:00 56707065 0+300 =300
所以最后第三个文件中的输出将是,
10201 2015-4-15 01:00:00 56707065 300 1
类似地cn_id 56707066,56707067
聚合结果并将其放入第三个文件。请向我推荐 python spark 片段。
我希望在 python spark 中加入这个结果,或者如果在 spark 中使用 pyhton 有任何其他技术。
嗯,我猜你可以加入 cn_id,然后加起来 c_val
你有 2 种简单的方法,一种是用索引压缩每个文件(在 scala-spark 中,我使用 zipWithIndex,并合并结果,groupBy 压缩 id,并通过添加 c_val 列来减少。
你的第二个解决方案是在 scala 中压缩和添加(自动保留索引)我的代码看起来像:
(filea,fileb).zipped.map((x,y)=> ((x.c_val+y.c_val)))
解决方案 3,看来您的 cn_id 也可用于索引正确的对。你可以做:
filea.union(fileb).groupBy(_.cn_id) 并通过添加
减少 c_val
我合并了两个文件并制作了单独的文件。它也包含一些浮点数 c_val,我有以下代码
from pyspark import SparkContext
sc = SparkContext("local", "Aggregate")
file1 = sc.textFile("hdfs://localhost:9000/data/parse/combine/joined.txt")
file2 = file1.flatMap(lambda line: line.split(','))\
.map(lambda x: (x[0] + ',' + x[3], float(x[4])))\
.reduceByKey(lambda a,b:a+b).coalesce(1)
final_agg = file2.map(lambda x: (x[0]+','+ x[3], float(x[4])))\
.reduceByKey(lambda a,b:a+b)\
.coalesce(1)
print(file2.collect())
我收到以下错误
ValueError: could not convert string to float:
根据上面的代码,预期的输出是:
[( '10201,56707065',300), ('10201,56707066',61)]
并且输入文件包含如下数据:
10201, '2015-4-15', '00:00:00', 56707065, '0', 0
10201 '2015-4-15', '00:00:00', 56707066, '1', 0
10201 '2015-4-15', '00:00:00', 56707067, '200', 0
10201, '2015-4-15', '00:30:00', 56707065, '300', 1
10201 '2015-4-15', '00:30:00', 56707066, '60', 1
10201 '2015-4-15', '00:30:00', 56707067, '20', 1
任何帮助将不胜感激。
谢谢
大家好,我是 python 和 spark 的新手,所以我需要大家的帮助。 我有两个文件,我使用 SparkContext
加载了它们file1 = sc.textFile("hdfs://localhost:9000/data/out1/part-00000")
file2 = sc.textFile("hdfs://localhost:9000/data/out2/part-00000")
文件 1 包含以下数据。
c_id date TM cn_id c_val tc
10201 2015-4-15 00:00:00 56707065 0 0
10201 2015-4-15 00:00:00 56707066 1 0
10201 2015-4-15 00:00:00 56707067 200 0
同样有多个 c_id,对于每个 c_id,cn_id 不同,文件 1 中的 c_value 不同,tc 是固定的,即 0.
文件 2 包含以下数据。
c_id dt tm cn_id c_val tc
10201 2015-4-15 01:00:00 56707065 300 1
10201 2015-4-15 01:00:00 56707066 60 1
10201 2015-4-15 01:00:00 56707067 20 1
这里 tc 是固定的,即 1
文件一和文件二中的所有值都相同,只有 c_val 根据 cn_id 更改,所以我想要第三个文件包含 c_val 的总和,即对于 c_id 10201
& 对于 cn_id 56707065
我想要这样的结果 10201 2015-4-15 01:00:00 56707065 0+300 =300
所以最后第三个文件中的输出将是,
10201 2015-4-15 01:00:00 56707065 300 1
类似地cn_id 56707066,56707067
聚合结果并将其放入第三个文件。请向我推荐 python spark 片段。
我希望在 python spark 中加入这个结果,或者如果在 spark 中使用 pyhton 有任何其他技术。
嗯,我猜你可以加入 cn_id,然后加起来 c_val
你有 2 种简单的方法,一种是用索引压缩每个文件(在 scala-spark 中,我使用 zipWithIndex,并合并结果,groupBy 压缩 id,并通过添加 c_val 列来减少。
你的第二个解决方案是在 scala 中压缩和添加(自动保留索引)我的代码看起来像:
(filea,fileb).zipped.map((x,y)=> ((x.c_val+y.c_val)))
解决方案 3,看来您的 cn_id 也可用于索引正确的对。你可以做: filea.union(fileb).groupBy(_.cn_id) 并通过添加
减少 c_val我合并了两个文件并制作了单独的文件。它也包含一些浮点数 c_val,我有以下代码
from pyspark import SparkContext
sc = SparkContext("local", "Aggregate")
file1 = sc.textFile("hdfs://localhost:9000/data/parse/combine/joined.txt")
file2 = file1.flatMap(lambda line: line.split(','))\
.map(lambda x: (x[0] + ',' + x[3], float(x[4])))\
.reduceByKey(lambda a,b:a+b).coalesce(1)
final_agg = file2.map(lambda x: (x[0]+','+ x[3], float(x[4])))\
.reduceByKey(lambda a,b:a+b)\
.coalesce(1)
print(file2.collect())
我收到以下错误
ValueError: could not convert string to float:
根据上面的代码,预期的输出是:
[( '10201,56707065',300), ('10201,56707066',61)]
并且输入文件包含如下数据:
10201, '2015-4-15', '00:00:00', 56707065, '0', 0
10201 '2015-4-15', '00:00:00', 56707066, '1', 0
10201 '2015-4-15', '00:00:00', 56707067, '200', 0
10201, '2015-4-15', '00:30:00', 56707065, '300', 1
10201 '2015-4-15', '00:30:00', 56707066, '60', 1
10201 '2015-4-15', '00:30:00', 56707067, '20', 1
任何帮助将不胜感激。 谢谢