在 PySpark 中将字典广播到 rdd

Broadcast a dictionary to rdd in PySpark

我刚刚掌握了 Spark 的窍门,我有一个函数需要映射到一个 rdd,但使用了一个全局字典:

from pyspark import SparkContext

sc = SparkContext('local[*]', 'pyspark')

my_dict = {"a": 1, "b": 2, "c": 3, "d": 4} # at no point will be modified
my_list = ["a", "d", "c", "b"]

def my_func(letter):
    return my_dict[letter]

my_list_rdd = sc.parallelize(my_list)

result = my_list_rdd.map(lambda x: my_func(x)).collect()

print result

以上给出了预期的结果;但是,我真的不确定我对全局变量 my_dict 的使用。似乎每个分区都制作了字典的副本。而且感觉不对..

看起来 broadcast 正是我要找的。但是,当我尝试使用它时:

my_dict_bc = sc.broadcast(my_dict)

def my_func(letter):
    return my_dict_bc[letter] 

我收到以下错误:

TypeError: 'Broadcast' object has no attribute '__getitem__

这似乎暗示我不能广播字典。

我的问题:如果我有一个使用全局字典的函数,它需要映射到 rdd,正确的方法是什么?

我的例子很简单,但实际上my_dictmy_list要大得多,而my_func更复杂。

您忘记了一些关于 Broadcast 对象的重要信息,它们有一个名为 value 的 属性 存储数据的地方。

因此你需要将my_func修改成这样:

my_dict_bc = sc.broadcast(my_dict)

def my_func(letter):
    return my_dict_bc.value[letter] 

正确的做法取决于程序的其余部分如何访问只读共享变量(在您的情况下是字典)。在您描述的情况下,您不需要使用广播变量。来自 the Spark programming guide section on broadcast variables:

Spark automatically broadcasts the common data needed by tasks within each stage. The data broadcasted this way is cached in serialized form and deserialized before running each task. This means that explicitly creating broadcast variables is only useful when tasks across multiple stages need the same data or when caching the data in deserialized form is important.

在你的情况下,如果只在单map阶段需要数据,则不需要显式广播变量(不是"useful")。但是,如果稍后在另一个阶段使用同一字典,那么您可能希望使用广播来避免在每个阶段之前对字典进行序列化和反序列化。