如何在 spark 中并行处理 return 字典?

How to return a dictionary in parallel processing in spark?

我有一个要处理的对象数组:Objects,我有一个接受字典和对象的函数,returns 相同的字典,修改:

new_dict = modify_object_dict(object_dict, object)

modify_object_dict 执行以下操作:

modify_object_dict 填充这些子词典,如上所示,结果是一个包含子词典的词典。

请注意,子词典不会相互影响。即一个对象的字典不与另一个对象的字典交互。

我希望使用 spark 并行处理这些对象:

object_dict = {}   # dictionary is initially empty
RDD = (sc.parallelize(Objects)
   .map(lambda object: modify_object_dict(object_dict, object))

这是执行此操作的正确方法吗?如果不是,那么 return 每次调用映射函数时修改的字典的正确方法是什么?

what is the correct way to return a dictionary that is modified every time the mapping function is called?

简短的回答是 none。由于每个分区都是单独处理的,因此无法创建具有读/写访问权限的共享对象。 Spark 只支持两种类型的共享变量,累加器和广播,分别具有只写和只读访问权限。

长答案取决于内部到底发生了什么modify_object_dict。如果您使用的操作是关联的和可交换的,并且可以在键的基础上执行(每个对象都可以映射到特定键上的操作),您可以使用 aggregateByKey 的一些变体。也可以使用 mapPartitions 在本地对数据进行分区和处理。

如果 modify_object_dict 不符合上述条件,那么 Spark 很可能不是一个好的选择。可以将状态推送到外部系统,但通常没有意义,除非 Spark 用于繁重的工作,而你推送到外部的只是最终结果。

此外,您不应该使用 map 来产生副作用。这种情况下的正确方法通常是 foreach。这里还有一个更微妙的问题。不能保证 map(或 foreach 就此而言)将只对每个元素执行一次。这意味着您执行的每个操作都必须是幂等的。

编辑:

根据您的描述,您似乎可以尝试以下方法:

  • 首先让我们创建 RDD 一个虚拟对象 class:

    class Foobar(object):
        def __init__(self, name, x=None, y=None, z=None):
            self.name = name
            self.x = x
            self.y = y
            self.z = z
    

    和对象的 RDD:

    objects = sc.parallelize([
        {"name": "foo", "x": 1}, {"name": "foo", "y": 3},
        {"name": "bar", "z": 4}
    ]).map(lambda x: Foobar(**x))
    
  • 接下来让我们将其转换为 PairwiseRDD,名称作为键,对象作为值。如果对象很大,您可以只提取感兴趣的字段并将它们用作值。我假设每个对象都有 name 属性.

    pairs = objects.map(lambda obj: (obj.name, obj))
    
  • groupByKey 和转换值:

    rdd = pairs.groupByKey().mapValues(lambda iter: ...)
    

    aggregateByKey(推荐):

    def seq_op(obj_dict, obj):
        # equivalent to modify_object_dict
        # Lets assume it is as simple as this
        obj_dict.update((k, getattr(obj, k)) for k in ("x", "y", "z"))
        return obj_dict
    
    def comb_op(obj_dict_1, obj_dict_2):
        # lets it is a simple union
        obj_dict_1.update(obj_dict_2)
        return obj_dict_1
    
    dicts = pairs.aggregateByKey({}, seq_op, comb_op)
    
  • 此刻你有一个 RDD 对 (name, dict)。它可用于进一步处理,或者如果您确实需要收集为地图的本地结构:

    dicts.collectAsMap()
    ## {'bar': {'x': None, 'y': None, 'z': 4},
    ##     'foo': {'x': None, 'y': 3, 'z': None}}