Google 数据流 - 从另一个 PCollection<String> 中排除一个 PCollection<String>

Google Dataflow - Exclude one PCollection<String> from another PCollection<String>

我有两个如下的P-Collection

P1 = ['H','E','L','L','O','W','O','R','L','D']

P2 = ['W','E','L','C','O','M','E']

我想排除第一个集合中的元素(如果存在),第二个集合中的元素会得到以下结果

Result = ['H','R','D']

执行此操作的优化和快速方法是什么?

使用CombinePerKeyhttps://beam.apache.org/documentation/programming-guide/#combine

Python: https://beam.apache.org/documentation/sdks/pydoc/2.5.0/apache_beam.transforms.core.html?highlight=combineperkey#apache_beam.transforms.core.CombinePerKey

Java: https://beam.apache.org/documentation/sdks/javadoc/2.5.0/org/apache/beam/sdk/transforms/Combine.PerKey.html

  1. 像这样将 P1 和 P2 转换成元组:

代码:

P1 = [('H', 'P1'), ('E', 'P1'), ('L', 'P1'), ('L', 'P1'), ('O', 'P1'), ('W', 'P1'), ('O', 'P1'), ('R', 'P1'), ('L', 'P1'), ('D', 'P1')]

P2 = [('W', 'P2'), ('E', 'P2'), ('L', 'P2'), ('C', 'P2'), ('O', 'P2'), ('M', 'P2'), ('E', 'P2')]
  1. 将 2 个 p 集合拼合在一起

  2. 将扁平化的 p 集合传递到 CombinePerKey,其中 CombineFn 标记字符串是否同时包含 p1 和 p2:

代码:

class IsInBoth(apache_beam.core.CombineFn):
    def _add_inputs(self, elements, accumulator=None):
        accumulator = accumulator or self.create_accumulator()
        for obj in elements:
            if obj == 'P1':
                accumulator['P1'] = True
            if obj == 'P2':
                accumulator['P2'] = True
        return accumulator

    def create_accumulator(self):
        return {'P1': False, 'P2': False}

    def add_input(self, accumulator, element, *args, **kwargs):
        return self._add_inputs(elements=[element], accumulator=accumulator)

    def add_inputs(self, accumulator, elements, *args, **kwargs):
        return self._add_inputs(elements=elements, accumulator=accumulator)

    def merge_accumulators(self, accumulators, *args, **kwargs):
        return {
            'P1': any([i['P1'] for i in accumulators]),
            'P2': any([i['P2'] for i in accumulators])}

    def extract_output(self, accumulator, *args, **kwargs):
        return accumulator
  1. 过滤掉 CombinePerKey 中具有 {'P1': True, 'P2': True}
  2. 的结果