Spark 和 BloomFilter 共享

Spark and BloomFilter sharing

我有一个巨大的 RDD(来源),我需要从中创建一个 BloomFilter 数据,因此后续对用户数据的更新将只考虑 true "diffs",没有重复。

看起来 BloomFilter 的大多数实现都是不可序列化的(虽然可以很容易地修复),但我想要稍微不同的工作流程:

  1. 处理每个分区并为每个分区创建一个适当的 BloomFilter 实例。对于每个 BloomFilter 对象 - 将其写入某处的二进制文件。我实际上不知道如何处理整个分区 - RDD 上有 mapPartition 函数可用,但这需要我 return 一个迭代器。也许我可以使用传递的迭代器,创建 BloomFilter 的实例,将其写入某处并将 return link 作为 Iterator.singleton[PathToFile]?
  2. 创建文件
  3. 在主节点上 - consume 该处理的结果(文件路径列表),读取这些文件并在内存中聚合 BloomFilters。然后将响应写入二进制文件。

我不知道正确的方法:

谢谢!

breeze 实现不是最快的,但它带有通常的 Spark 依赖项,可以与 simple aggregate:

一起使用
import breeze.util.BloomFilter

// Adjust values to fit your case
val numBuckets: Int = 100
val numHashFunctions: Int = 30

val rdd = sc.parallelize(Seq("a", "d", "f", "e", "g", "j", "z", "k"), 4)
val bf = rdd.aggregate(new BloomFilter[String](numBuckets, numHashFunctions))(
  _ += _, _ |= _
)

bf.contains("a")
Boolean = true
bf.contains("n")
Boolean = false

在 Spark 2.0+ 中你可以使用 DataFrameStatFunctions.bloomFilter:

val df = rdd.toDF

val expectedNumItems: Long = 1000 
val fpp: Double = 0.005

val sbf = df.stat.bloomFilter($"value", expectedNumItems, fpp)

sbf.mightContain("a")
Boolean = true
sbf.mightContain("n")
Boolean = false

Algebird 实现也可以工作,并且可以类似于 breeze 实现使用。