Spark 和 BloomFilter 共享
Spark and BloomFilter sharing
我有一个巨大的 RDD(来源),我需要从中创建一个 BloomFilter 数据,因此后续对用户数据的更新将只考虑 true "diffs",没有重复。
看起来 BloomFilter 的大多数实现都是不可序列化的(虽然可以很容易地修复),但我想要稍微不同的工作流程:
- 处理每个分区并为每个分区创建一个适当的 BloomFilter 实例。对于每个 BloomFilter 对象 - 将其写入某处的二进制文件。我实际上不知道如何处理整个分区 - RDD 上有
mapPartition
函数可用,但这需要我 return 一个迭代器。也许我可以使用传递的迭代器,创建 BloomFilter 的实例,将其写入某处并将 return link 作为 Iterator.singleton[PathToFile]
? 创建文件
- 在主节点上 -
consume
该处理的结果(文件路径列表),读取这些文件并在内存中聚合 BloomFilters。然后将响应写入二进制文件。
我不知道正确的方法:
- 从传递给
mapPartitions
的函数中在集群支持的 FS 中创建一个文件(它可以是 HDFS、S3N 或本地文件)
- 在第二阶段使用
consume
读取文件的内容(当我有一个包含文件路径的 RDD 时,我必须使用 SparkContext
来读取它们 - 不要'不知道怎么可能)。
谢谢!
breeze
实现不是最快的,但它带有通常的 Spark 依赖项,可以与 simple aggregate
:
一起使用
import breeze.util.BloomFilter
// Adjust values to fit your case
val numBuckets: Int = 100
val numHashFunctions: Int = 30
val rdd = sc.parallelize(Seq("a", "d", "f", "e", "g", "j", "z", "k"), 4)
val bf = rdd.aggregate(new BloomFilter[String](numBuckets, numHashFunctions))(
_ += _, _ |= _
)
bf.contains("a")
Boolean = true
bf.contains("n")
Boolean = false
在 Spark 2.0+ 中你可以使用 DataFrameStatFunctions.bloomFilter
:
val df = rdd.toDF
val expectedNumItems: Long = 1000
val fpp: Double = 0.005
val sbf = df.stat.bloomFilter($"value", expectedNumItems, fpp)
sbf.mightContain("a")
Boolean = true
sbf.mightContain("n")
Boolean = false
Algebird
实现也可以工作,并且可以类似于 breeze
实现使用。
我有一个巨大的 RDD(来源),我需要从中创建一个 BloomFilter 数据,因此后续对用户数据的更新将只考虑 true "diffs",没有重复。
看起来 BloomFilter 的大多数实现都是不可序列化的(虽然可以很容易地修复),但我想要稍微不同的工作流程:
- 处理每个分区并为每个分区创建一个适当的 BloomFilter 实例。对于每个 BloomFilter 对象 - 将其写入某处的二进制文件。我实际上不知道如何处理整个分区 - RDD 上有
mapPartition
函数可用,但这需要我 return 一个迭代器。也许我可以使用传递的迭代器,创建 BloomFilter 的实例,将其写入某处并将 return link 作为Iterator.singleton[PathToFile]
? 创建文件
- 在主节点上 -
consume
该处理的结果(文件路径列表),读取这些文件并在内存中聚合 BloomFilters。然后将响应写入二进制文件。
我不知道正确的方法:
- 从传递给
mapPartitions
的函数中在集群支持的 FS 中创建一个文件(它可以是 HDFS、S3N 或本地文件)
- 在第二阶段使用
consume
读取文件的内容(当我有一个包含文件路径的 RDD 时,我必须使用SparkContext
来读取它们 - 不要'不知道怎么可能)。
谢谢!
breeze
实现不是最快的,但它带有通常的 Spark 依赖项,可以与 simple aggregate
:
import breeze.util.BloomFilter
// Adjust values to fit your case
val numBuckets: Int = 100
val numHashFunctions: Int = 30
val rdd = sc.parallelize(Seq("a", "d", "f", "e", "g", "j", "z", "k"), 4)
val bf = rdd.aggregate(new BloomFilter[String](numBuckets, numHashFunctions))(
_ += _, _ |= _
)
bf.contains("a")
Boolean = true
bf.contains("n")
Boolean = false
在 Spark 2.0+ 中你可以使用 DataFrameStatFunctions.bloomFilter
:
val df = rdd.toDF
val expectedNumItems: Long = 1000
val fpp: Double = 0.005
val sbf = df.stat.bloomFilter($"value", expectedNumItems, fpp)
sbf.mightContain("a")
Boolean = true
sbf.mightContain("n")
Boolean = false
Algebird
实现也可以工作,并且可以类似于 breeze
实现使用。