Pyspark:如何使用 reduceByKey() 获取唯一值对? (不使用 Distinct() 方法)
Pyspark: How to get a unique value pairs with reduceByKey()? (without using Distinct() method)
我正在尝试从 [1,2,3] 的多个列中获取唯一值对。
数据量很大,有多个文件(总大小约1TB)。
我只想过滤带有 "client" 字符串的行并 grep 每个文件中唯一的 [1,2,3] 列。
我首先使用了元组和 Distinct()
函数,但该过程因 Java 内存错误而停止。
if __name__ == "__main__":
sc=SparkContext(appName="someapp")
cmd = 'hdfs dfs -ls /user/path'.split()
files = subprocess.check_output(cmd).strip().split('\n')
rdds=[]
for ff in files[1:]:
rdd=sc.textFile(ff.split()[-1])
rdd2=rdd.filter(lambda x: "client" in x.lower())
rdd3=rdd2.map(lambda x: tuple(x.split("\t")[y] for y in [1,2,3]))
rdd4=rdd3.distinct()
rdds.append(rdd4)
rdd0=sc.union(rdds)
rdd0.collect()
rdd0.saveAsTextFile('/somedir')
所以我尝试了另一个使用 reduceByKey()
方法的脚本,效果很好。
if __name__ == "__main__":
sc=SparkContext(appName="someapp")
cmd = "hdfs dfs -ls airties/eventU".split()
files = subprocess.check_output(cmd).strip().split('\n')
rdds=[]
for ff in files[1:]:
rdd=sc.textFile(ff.split()[-1])
rdd2=rdd.filter(lambda x: "client" in x.lower())
rdd3=rdd2.map(lambda x: ','.join([x.split("\t")[y] for y in [1,2,3]]))
rdds.append(rdd3)
rdd0=sc.union(rdds)
rddA=rdd0.map(lambda x: (x,1)).reduceByKey(lambda a,b: a+b)
rddA.collect()
rddA.saveAsTextFile('/somedir')
但我试图理解为什么 Distinct()
效果不佳,但 reduceByKey()
方法有效。 distinct()
不是查找唯一值的正确方法吗?
还试图找到是否有更好的方法来优化多个文件的处理,在每个文件中查找唯一值对并聚合它们。当每个文件都包含独占内容时,我只需要对每个文件应用 unique 并在最后一步聚合在一起。但是我当前的代码似乎对系统造成了太多的开销。
数据是这样的:大量冗余
+-----+---+------+----------+
|1 |2 |3 |4 |
+-----+---+------+----------+
| 1| 1| A|2017-01-01|
| 2| 6|client|2017-01-02|
| 2| 3| B|2017-01-02|
| 3| 5| A|2017-01-03|
| 3| 5|client|2017-01-03|
| 2| 2|client|2017-01-02|
| 3| 5| A|2017-01-03|
| 1| 3| B|2017-01-02|
| 3| 5|client|2017-01-03|
| 3| 5|client|2017-01-04|
+-----+---+------+----------+
数据是这样的:大量冗余
+-----+---+------+
|1 |2 |3 |
+-----+---+------+
| 2| 6|client|
| 3| 5|client|
| 2| 2|client|
| 3| 5|client|
| 3| 5|client|
+-----+---+------+
第 3 列是多余的,但仅以此作为示例场景。
distinct
是使用 reduceByKey
实现的。实际上,您只是(几乎)完全按照 currently implemented.
的方式重新实现了 distinct
但是,您的 2 个代码片段不相等。在
第一个片段
- 处理RDD
- 保存 RDD 的不同元素
- 将 RDD 追加到列表中,稍后创建聚合 RDD
它将是第二个片段
- 处理RDD
- 将 RDD 追加到列表中,稍后创建聚合 RDD
- 在聚合 RDD 中保存不同的元素
如果不同文件中有重复的行,它们将在第一个片段中重复,但不会在第二个片段中重复。这可能就是您 运行 内存不足的原因。请注意,此处不需要使用 collect
调用将所有记录带到驱动程序,这会严重影响性能
我正在尝试从 [1,2,3] 的多个列中获取唯一值对。 数据量很大,有多个文件(总大小约1TB)。
我只想过滤带有 "client" 字符串的行并 grep 每个文件中唯一的 [1,2,3] 列。
我首先使用了元组和 Distinct()
函数,但该过程因 Java 内存错误而停止。
if __name__ == "__main__":
sc=SparkContext(appName="someapp")
cmd = 'hdfs dfs -ls /user/path'.split()
files = subprocess.check_output(cmd).strip().split('\n')
rdds=[]
for ff in files[1:]:
rdd=sc.textFile(ff.split()[-1])
rdd2=rdd.filter(lambda x: "client" in x.lower())
rdd3=rdd2.map(lambda x: tuple(x.split("\t")[y] for y in [1,2,3]))
rdd4=rdd3.distinct()
rdds.append(rdd4)
rdd0=sc.union(rdds)
rdd0.collect()
rdd0.saveAsTextFile('/somedir')
所以我尝试了另一个使用 reduceByKey()
方法的脚本,效果很好。
if __name__ == "__main__":
sc=SparkContext(appName="someapp")
cmd = "hdfs dfs -ls airties/eventU".split()
files = subprocess.check_output(cmd).strip().split('\n')
rdds=[]
for ff in files[1:]:
rdd=sc.textFile(ff.split()[-1])
rdd2=rdd.filter(lambda x: "client" in x.lower())
rdd3=rdd2.map(lambda x: ','.join([x.split("\t")[y] for y in [1,2,3]]))
rdds.append(rdd3)
rdd0=sc.union(rdds)
rddA=rdd0.map(lambda x: (x,1)).reduceByKey(lambda a,b: a+b)
rddA.collect()
rddA.saveAsTextFile('/somedir')
但我试图理解为什么 Distinct()
效果不佳,但 reduceByKey()
方法有效。 distinct()
不是查找唯一值的正确方法吗?
还试图找到是否有更好的方法来优化多个文件的处理,在每个文件中查找唯一值对并聚合它们。当每个文件都包含独占内容时,我只需要对每个文件应用 unique 并在最后一步聚合在一起。但是我当前的代码似乎对系统造成了太多的开销。
数据是这样的:大量冗余
+-----+---+------+----------+
|1 |2 |3 |4 |
+-----+---+------+----------+
| 1| 1| A|2017-01-01|
| 2| 6|client|2017-01-02|
| 2| 3| B|2017-01-02|
| 3| 5| A|2017-01-03|
| 3| 5|client|2017-01-03|
| 2| 2|client|2017-01-02|
| 3| 5| A|2017-01-03|
| 1| 3| B|2017-01-02|
| 3| 5|client|2017-01-03|
| 3| 5|client|2017-01-04|
+-----+---+------+----------+
数据是这样的:大量冗余
+-----+---+------+
|1 |2 |3 |
+-----+---+------+
| 2| 6|client|
| 3| 5|client|
| 2| 2|client|
| 3| 5|client|
| 3| 5|client|
+-----+---+------+
第 3 列是多余的,但仅以此作为示例场景。
distinct
是使用 reduceByKey
实现的。实际上,您只是(几乎)完全按照 currently implemented.
distinct
但是,您的 2 个代码片段不相等。在
第一个片段
- 处理RDD
- 保存 RDD 的不同元素
- 将 RDD 追加到列表中,稍后创建聚合 RDD
它将是第二个片段
- 处理RDD
- 将 RDD 追加到列表中,稍后创建聚合 RDD
- 在聚合 RDD 中保存不同的元素
如果不同文件中有重复的行,它们将在第一个片段中重复,但不会在第二个片段中重复。这可能就是您 运行 内存不足的原因。请注意,此处不需要使用 collect
调用将所有记录带到驱动程序,这会严重影响性能