如何在 spark 中并行处理 return 字典?
How to return a dictionary in parallel processing in spark?
我有一个要处理的对象数组:Objects
,我有一个接受字典和对象的函数,returns 相同的字典,修改:
new_dict = modify_object_dict(object_dict, object)
modify_object_dict
执行以下操作:
向字典添加一个关键字,该关键字是处理对象的名称
创建一个字典作为该键的值(字典中的字典),其中添加和删除了元素。
例如,对象可能是一个文件:object_dict['file_name']=sub_dictionary
,子词典可能包含sub_dictionary['file_attribute']=attribute
。
modify_object_dict
填充这些子词典,如上所示,结果是一个包含子词典的词典。
请注意,子词典不会相互影响。即一个对象的字典不与另一个对象的字典交互。
我希望使用 spark 并行处理这些对象:
object_dict = {} # dictionary is initially empty
RDD = (sc.parallelize(Objects)
.map(lambda object: modify_object_dict(object_dict, object))
这是执行此操作的正确方法吗?如果不是,那么 return 每次调用映射函数时修改的字典的正确方法是什么?
what is the correct way to return a dictionary that is modified every time the mapping function is called?
简短的回答是 none。由于每个分区都是单独处理的,因此无法创建具有读/写访问权限的共享对象。 Spark 只支持两种类型的共享变量,累加器和广播,分别具有只写和只读访问权限。
长答案取决于内部到底发生了什么modify_object_dict
。如果您使用的操作是关联的和可交换的,并且可以在键的基础上执行(每个对象都可以映射到特定键上的操作),您可以使用 aggregateByKey
的一些变体。也可以使用 mapPartitions
在本地对数据进行分区和处理。
如果 modify_object_dict
不符合上述条件,那么 Spark 很可能不是一个好的选择。可以将状态推送到外部系统,但通常没有意义,除非 Spark 用于繁重的工作,而你推送到外部的只是最终结果。
此外,您不应该使用 map
来产生副作用。这种情况下的正确方法通常是 foreach
。这里还有一个更微妙的问题。不能保证 map
(或 foreach
就此而言)将只对每个元素执行一次。这意味着您执行的每个操作都必须是幂等的。
编辑:
根据您的描述,您似乎可以尝试以下方法:
首先让我们创建 RDD
一个虚拟对象 class:
class Foobar(object):
def __init__(self, name, x=None, y=None, z=None):
self.name = name
self.x = x
self.y = y
self.z = z
和对象的 RDD:
objects = sc.parallelize([
{"name": "foo", "x": 1}, {"name": "foo", "y": 3},
{"name": "bar", "z": 4}
]).map(lambda x: Foobar(**x))
接下来让我们将其转换为 PairwiseRDD
,名称作为键,对象作为值。如果对象很大,您可以只提取感兴趣的字段并将它们用作值。我假设每个对象都有 name
属性.
pairs = objects.map(lambda obj: (obj.name, obj))
groupByKey
和转换值:
rdd = pairs.groupByKey().mapValues(lambda iter: ...)
或aggregateByKey
(推荐):
def seq_op(obj_dict, obj):
# equivalent to modify_object_dict
# Lets assume it is as simple as this
obj_dict.update((k, getattr(obj, k)) for k in ("x", "y", "z"))
return obj_dict
def comb_op(obj_dict_1, obj_dict_2):
# lets it is a simple union
obj_dict_1.update(obj_dict_2)
return obj_dict_1
dicts = pairs.aggregateByKey({}, seq_op, comb_op)
此刻你有一个 RDD 对 (name, dict)
。它可用于进一步处理,或者如果您确实需要收集为地图的本地结构:
dicts.collectAsMap()
## {'bar': {'x': None, 'y': None, 'z': 4},
## 'foo': {'x': None, 'y': 3, 'z': None}}
我有一个要处理的对象数组:Objects
,我有一个接受字典和对象的函数,returns 相同的字典,修改:
new_dict = modify_object_dict(object_dict, object)
modify_object_dict
执行以下操作:
向字典添加一个关键字,该关键字是处理对象的名称
创建一个字典作为该键的值(字典中的字典),其中添加和删除了元素。
例如,对象可能是一个文件:
object_dict['file_name']=sub_dictionary
,子词典可能包含sub_dictionary['file_attribute']=attribute
。
modify_object_dict
填充这些子词典,如上所示,结果是一个包含子词典的词典。
请注意,子词典不会相互影响。即一个对象的字典不与另一个对象的字典交互。
我希望使用 spark 并行处理这些对象:
object_dict = {} # dictionary is initially empty
RDD = (sc.parallelize(Objects)
.map(lambda object: modify_object_dict(object_dict, object))
这是执行此操作的正确方法吗?如果不是,那么 return 每次调用映射函数时修改的字典的正确方法是什么?
what is the correct way to return a dictionary that is modified every time the mapping function is called?
简短的回答是 none。由于每个分区都是单独处理的,因此无法创建具有读/写访问权限的共享对象。 Spark 只支持两种类型的共享变量,累加器和广播,分别具有只写和只读访问权限。
长答案取决于内部到底发生了什么modify_object_dict
。如果您使用的操作是关联的和可交换的,并且可以在键的基础上执行(每个对象都可以映射到特定键上的操作),您可以使用 aggregateByKey
的一些变体。也可以使用 mapPartitions
在本地对数据进行分区和处理。
如果 modify_object_dict
不符合上述条件,那么 Spark 很可能不是一个好的选择。可以将状态推送到外部系统,但通常没有意义,除非 Spark 用于繁重的工作,而你推送到外部的只是最终结果。
此外,您不应该使用 map
来产生副作用。这种情况下的正确方法通常是 foreach
。这里还有一个更微妙的问题。不能保证 map
(或 foreach
就此而言)将只对每个元素执行一次。这意味着您执行的每个操作都必须是幂等的。
编辑:
根据您的描述,您似乎可以尝试以下方法:
首先让我们创建
RDD
一个虚拟对象 class:class Foobar(object): def __init__(self, name, x=None, y=None, z=None): self.name = name self.x = x self.y = y self.z = z
和对象的 RDD:
objects = sc.parallelize([ {"name": "foo", "x": 1}, {"name": "foo", "y": 3}, {"name": "bar", "z": 4} ]).map(lambda x: Foobar(**x))
接下来让我们将其转换为
PairwiseRDD
,名称作为键,对象作为值。如果对象很大,您可以只提取感兴趣的字段并将它们用作值。我假设每个对象都有name
属性.pairs = objects.map(lambda obj: (obj.name, obj))
groupByKey
和转换值:rdd = pairs.groupByKey().mapValues(lambda iter: ...)
或
aggregateByKey
(推荐):def seq_op(obj_dict, obj): # equivalent to modify_object_dict # Lets assume it is as simple as this obj_dict.update((k, getattr(obj, k)) for k in ("x", "y", "z")) return obj_dict def comb_op(obj_dict_1, obj_dict_2): # lets it is a simple union obj_dict_1.update(obj_dict_2) return obj_dict_1 dicts = pairs.aggregateByKey({}, seq_op, comb_op)
此刻你有一个 RDD 对
(name, dict)
。它可用于进一步处理,或者如果您确实需要收集为地图的本地结构:dicts.collectAsMap() ## {'bar': {'x': None, 'y': None, 'z': 4}, ## 'foo': {'x': None, 'y': 3, 'z': None}}