Google 数据流 - 从另一个 PCollection<String> 中排除一个 PCollection<String>
Google Dataflow - Exclude one PCollection<String> from another PCollection<String>
我有两个如下的P-Collection
P1 = ['H','E','L','L','O','W','O','R','L','D']
P2 = ['W','E','L','C','O','M','E']
我想排除第一个集合中的元素(如果存在),第二个集合中的元素会得到以下结果
Result = ['H','R','D']
执行此操作的优化和快速方法是什么?
使用CombinePerKey
:https://beam.apache.org/documentation/programming-guide/#combine
- 像这样将 P1 和 P2 转换成元组:
代码:
P1 = [('H', 'P1'), ('E', 'P1'), ('L', 'P1'), ('L', 'P1'), ('O', 'P1'), ('W', 'P1'), ('O', 'P1'), ('R', 'P1'), ('L', 'P1'), ('D', 'P1')]
P2 = [('W', 'P2'), ('E', 'P2'), ('L', 'P2'), ('C', 'P2'), ('O', 'P2'), ('M', 'P2'), ('E', 'P2')]
将 2 个 p 集合拼合在一起
将扁平化的 p 集合传递到 CombinePerKey
,其中 CombineFn
标记字符串是否同时包含 p1 和 p2:
代码:
class IsInBoth(apache_beam.core.CombineFn):
def _add_inputs(self, elements, accumulator=None):
accumulator = accumulator or self.create_accumulator()
for obj in elements:
if obj == 'P1':
accumulator['P1'] = True
if obj == 'P2':
accumulator['P2'] = True
return accumulator
def create_accumulator(self):
return {'P1': False, 'P2': False}
def add_input(self, accumulator, element, *args, **kwargs):
return self._add_inputs(elements=[element], accumulator=accumulator)
def add_inputs(self, accumulator, elements, *args, **kwargs):
return self._add_inputs(elements=elements, accumulator=accumulator)
def merge_accumulators(self, accumulators, *args, **kwargs):
return {
'P1': any([i['P1'] for i in accumulators]),
'P2': any([i['P2'] for i in accumulators])}
def extract_output(self, accumulator, *args, **kwargs):
return accumulator
- 过滤掉
CombinePerKey
中具有 {'P1': True, 'P2': True}
的结果
我有两个如下的P-Collection
P1 = ['H','E','L','L','O','W','O','R','L','D']
P2 = ['W','E','L','C','O','M','E']
我想排除第一个集合中的元素(如果存在),第二个集合中的元素会得到以下结果
Result = ['H','R','D']
执行此操作的优化和快速方法是什么?
使用CombinePerKey
:https://beam.apache.org/documentation/programming-guide/#combine
- 像这样将 P1 和 P2 转换成元组:
代码:
P1 = [('H', 'P1'), ('E', 'P1'), ('L', 'P1'), ('L', 'P1'), ('O', 'P1'), ('W', 'P1'), ('O', 'P1'), ('R', 'P1'), ('L', 'P1'), ('D', 'P1')]
P2 = [('W', 'P2'), ('E', 'P2'), ('L', 'P2'), ('C', 'P2'), ('O', 'P2'), ('M', 'P2'), ('E', 'P2')]
将 2 个 p 集合拼合在一起
将扁平化的 p 集合传递到
CombinePerKey
,其中CombineFn
标记字符串是否同时包含 p1 和 p2:
代码:
class IsInBoth(apache_beam.core.CombineFn):
def _add_inputs(self, elements, accumulator=None):
accumulator = accumulator or self.create_accumulator()
for obj in elements:
if obj == 'P1':
accumulator['P1'] = True
if obj == 'P2':
accumulator['P2'] = True
return accumulator
def create_accumulator(self):
return {'P1': False, 'P2': False}
def add_input(self, accumulator, element, *args, **kwargs):
return self._add_inputs(elements=[element], accumulator=accumulator)
def add_inputs(self, accumulator, elements, *args, **kwargs):
return self._add_inputs(elements=elements, accumulator=accumulator)
def merge_accumulators(self, accumulators, *args, **kwargs):
return {
'P1': any([i['P1'] for i in accumulators]),
'P2': any([i['P2'] for i in accumulators])}
def extract_output(self, accumulator, *args, **kwargs):
return accumulator
- 过滤掉
CombinePerKey
中具有{'P1': True, 'P2': True}
的结果