使用 Apache Beam 的 Python SDK 查找具有最大标记数的字符串
Find string with max number of tokens using Apache Beam's Python SDK
我有一个包含字符串的 PCollection。我想按 space 拆分每个字符串并找到具有 最大大小 的标记列表并将大小存储在变量中作为 int
.
考虑这个示例输入:
sentences = ['This is the first sentence',
'Second sentence',
'Yet another sentence']
with beam.Pipeline(options=PipelineOptions()) as p:
pcoll = p | 'Create' >> beam.Create(sentences)
拆分后的句子为:
['This', 'is', 'the', 'first', 'sentence'] -> 5
['Second', 'sentence'] -> 2
['Yet', 'another', 'sentence'] -> 3
我想将值 5
存储在一个变量中。
我该怎么做?我遇到了 this blogpost,但它并不完全符合我的目的。作者正在打印结果 PCollection,但我想稍后在管道的其他阶段使用此值。
您可以使用 Top.of
转换来做到这一点。简而言之,我们拆分每个句子,然后计算令牌长度。使用 Top
我们只想要第一名的结果,我们传递一个 lambda 函数作为比较标准以按字长对它们进行排序:
sentences = sentences = ['This is the first sentence',
'Second sentence',
'Yet another sentence']
longest_sentence = (p
| 'Read Sentences' >> beam.Create(sentences)
| 'Split into Words' >> beam.Map(lambda x: x.split(' '))
| 'Map Token Length' >> beam.Map(lambda x: (x, len(x)))
| 'Top Sentence' >> combine.Top.Of(1, lambda a,b: a[1]<b[1])
| 'Save Variable' >> beam.ParDo(SaveMaxFn()))
其中 SaveMaxFn()
是:
class SaveMaxFn(beam.DoFn):
"""Stores max in global variables"""
def process(self, element):
length = element[0][1]
logging.info("Longest sentence: %s tokens", length)
return element
和length
是一个全局变量:
global length
结果:
INFO:root:Longest sentence: 5 token(s)
完整代码:
import argparse, logging
import apache_beam as beam
import apache_beam.transforms.combiners as combine
from apache_beam.options.pipeline_options import PipelineOptions
from apache_beam.options.pipeline_options import SetupOptions
class SaveMaxFn(beam.DoFn):
"""Stores max in global variables"""
def process(self, element):
length = element[0][1]
logging.info("Longest sentence: %s token(s)", length)
return element
def run(argv=None):
parser = argparse.ArgumentParser()
known_args, pipeline_args = parser.parse_known_args(argv)
global length
pipeline_options = PipelineOptions(pipeline_args)
pipeline_options.view_as(SetupOptions).save_main_session = True
p = beam.Pipeline(options=pipeline_options)
sentences = sentences = ['This is the first sentence',
'Second sentence',
'Yet another sentence']
longest_sentence = (p
| 'Read Sentences' >> beam.Create(sentences)
| 'Split into Words' >> beam.Map(lambda x: x.split(' '))
| 'Map Token Length' >> beam.Map(lambda x: (x, len(x)))
| 'Top Sentence' >> combine.Top.Of(1, lambda a,b: a[1]<b[1])
| 'Save Variable' >> beam.ParDo(SaveMaxFn()))
result = p.run()
result.wait_until_finish()
if __name__ == '__main__':
logging.getLogger().setLevel(logging.INFO)
run()
我有一个包含字符串的 PCollection。我想按 space 拆分每个字符串并找到具有 最大大小 的标记列表并将大小存储在变量中作为 int
.
考虑这个示例输入:
sentences = ['This is the first sentence',
'Second sentence',
'Yet another sentence']
with beam.Pipeline(options=PipelineOptions()) as p:
pcoll = p | 'Create' >> beam.Create(sentences)
拆分后的句子为:
['This', 'is', 'the', 'first', 'sentence'] -> 5
['Second', 'sentence'] -> 2
['Yet', 'another', 'sentence'] -> 3
我想将值 5
存储在一个变量中。
我该怎么做?我遇到了 this blogpost,但它并不完全符合我的目的。作者正在打印结果 PCollection,但我想稍后在管道的其他阶段使用此值。
您可以使用 Top.of
转换来做到这一点。简而言之,我们拆分每个句子,然后计算令牌长度。使用 Top
我们只想要第一名的结果,我们传递一个 lambda 函数作为比较标准以按字长对它们进行排序:
sentences = sentences = ['This is the first sentence',
'Second sentence',
'Yet another sentence']
longest_sentence = (p
| 'Read Sentences' >> beam.Create(sentences)
| 'Split into Words' >> beam.Map(lambda x: x.split(' '))
| 'Map Token Length' >> beam.Map(lambda x: (x, len(x)))
| 'Top Sentence' >> combine.Top.Of(1, lambda a,b: a[1]<b[1])
| 'Save Variable' >> beam.ParDo(SaveMaxFn()))
其中 SaveMaxFn()
是:
class SaveMaxFn(beam.DoFn):
"""Stores max in global variables"""
def process(self, element):
length = element[0][1]
logging.info("Longest sentence: %s tokens", length)
return element
和length
是一个全局变量:
global length
结果:
INFO:root:Longest sentence: 5 token(s)
完整代码:
import argparse, logging
import apache_beam as beam
import apache_beam.transforms.combiners as combine
from apache_beam.options.pipeline_options import PipelineOptions
from apache_beam.options.pipeline_options import SetupOptions
class SaveMaxFn(beam.DoFn):
"""Stores max in global variables"""
def process(self, element):
length = element[0][1]
logging.info("Longest sentence: %s token(s)", length)
return element
def run(argv=None):
parser = argparse.ArgumentParser()
known_args, pipeline_args = parser.parse_known_args(argv)
global length
pipeline_options = PipelineOptions(pipeline_args)
pipeline_options.view_as(SetupOptions).save_main_session = True
p = beam.Pipeline(options=pipeline_options)
sentences = sentences = ['This is the first sentence',
'Second sentence',
'Yet another sentence']
longest_sentence = (p
| 'Read Sentences' >> beam.Create(sentences)
| 'Split into Words' >> beam.Map(lambda x: x.split(' '))
| 'Map Token Length' >> beam.Map(lambda x: (x, len(x)))
| 'Top Sentence' >> combine.Top.Of(1, lambda a,b: a[1]<b[1])
| 'Save Variable' >> beam.ParDo(SaveMaxFn()))
result = p.run()
result.wait_until_finish()
if __name__ == '__main__':
logging.getLogger().setLevel(logging.INFO)
run()