Pyspark:如何使用 reduceByKey() 获取唯一值对? (不使用 Distinct() 方法)

Pyspark: How to get a unique value pairs with reduceByKey()? (without using Distinct() method)

我正在尝试从 [1,2,3] 的多个列中获取唯一值对。 数据量很大,有多个文件(总大小约1TB)。

我只想过滤带有 "client" 字符串的行并 grep 每个文件中唯一的 [1,2,3] 列。 我首先使用了元组和 Distinct() 函数,但该过程因 Java 内存错误而停止。

if __name__ == "__main__":
    sc=SparkContext(appName="someapp")
    cmd = 'hdfs dfs -ls /user/path'.split()
    files = subprocess.check_output(cmd).strip().split('\n')
    rdds=[]
    for ff in files[1:]:
        rdd=sc.textFile(ff.split()[-1])
        rdd2=rdd.filter(lambda x: "client" in x.lower())
        rdd3=rdd2.map(lambda x: tuple(x.split("\t")[y] for y in [1,2,3]))
        rdd4=rdd3.distinct()
        rdds.append(rdd4)

     rdd0=sc.union(rdds)
     rdd0.collect()
     rdd0.saveAsTextFile('/somedir')

所以我尝试了另一个使用 reduceByKey() 方法的脚本,效果很好。

if __name__ == "__main__":
        sc=SparkContext(appName="someapp")
        cmd = "hdfs dfs -ls airties/eventU".split()
        files = subprocess.check_output(cmd).strip().split('\n')
        rdds=[]
        for ff in files[1:]:
                rdd=sc.textFile(ff.split()[-1])
                rdd2=rdd.filter(lambda x: "client" in x.lower())
                rdd3=rdd2.map(lambda x: ','.join([x.split("\t")[y] for y in [1,2,3]]))
                rdds.append(rdd3)
        rdd0=sc.union(rdds)
        rddA=rdd0.map(lambda x: (x,1)).reduceByKey(lambda a,b: a+b)
        rddA.collect()
        rddA.saveAsTextFile('/somedir')

但我试图理解为什么 Distinct() 效果不佳,但 reduceByKey() 方法有效。 distinct() 不是查找唯一值的正确方法吗?

还试图找到是否有更好的方法来优化多个文件的处理,在每个文件中查找唯一值对并聚合它们。当每个文件都包含独占内容时,我只需要对每个文件应用 unique 并在最后一步聚合在一起。但是我当前的代码似乎对系统造成了太多的开销。

数据是这样的:大量冗余

+-----+---+------+----------+
|1    |2  |3     |4         |
+-----+---+------+----------+
|    1|  1|     A|2017-01-01|
|    2|  6|client|2017-01-02|
|    2|  3|     B|2017-01-02|
|    3|  5|     A|2017-01-03|
|    3|  5|client|2017-01-03|
|    2|  2|client|2017-01-02|
|    3|  5|     A|2017-01-03|
|    1|  3|     B|2017-01-02|
|    3|  5|client|2017-01-03|
|    3|  5|client|2017-01-04|
+-----+---+------+----------+

数据是这样的:大量冗余

+-----+---+------+
|1    |2  |3     |
+-----+---+------+
|    2|  6|client|
|    3|  5|client|
|    2|  2|client|
|    3|  5|client|
|    3|  5|client|
+-----+---+------+

第 3 列是多余的,但仅以此作为示例场景。

distinct 是使用 reduceByKey 实现的。实际上,您只是(几乎)完全按照 currently implemented.

的方式重新实现了 distinct

但是,您的 2 个代码片段不相等。在

第一个片段

  • 处理RDD
  • 保存 RDD 的不同元素
  • 将 RDD 追加到列表中,稍后创建聚合 RDD

它将是第二个片段

  • 处理RDD
  • 将 RDD 追加到列表中,稍后创建聚合 RDD
  • 在聚合 RDD 中保存不同的元素

如果不同文件中有重复的行,它们将在第一个片段中重复,但不会在第二个片段中重复。这可能就是您 运行 内存不足的原因。请注意,此处不需要使用 collect 调用将所有记录带到驱动程序,这会严重影响性能