无法在 Dataflow 中查看 beam.combiners.Count.PerElement() 的输出
Unable to view output for beam.combiners.Count.PerElement() in Dataflow
我有一个 Pub/Sub 脚本发布男性名字如下:
from google.cloud import pubsub_v1
import names
project_id = "Your-Project-Name"
topic_name = "Your-Topic-Name"
publisher = pubsub_v1.PublisherClient()
topic_path = publisher.topic_path(project_id, topic_name)
while True:
data = names.get_first_name(gender='male') #u"Message number {}".format(n)
data = data.encode("utf-8")
publisher.publish(topic_path, data=data)
然后我有一个数据流,它从附加到主题的订阅中读取,然后按如下方式计算管道的每个元素:
import logging,re,os
import apache_beam as beam
from apache_beam.options.pipeline_options import PipelineOptions
root = logging.getLogger()
root.setLevel(logging.INFO)
p = beam.Pipeline(options=PipelineOptions())
x = (
p
| beam.io.ReadFromPubSub(topic=None, subscription="projects/YOUR-PROJECT-NAME/subscriptions/YOUR-SUBSCRIPTION-NAME").with_output_types(bytes)
| 'Decode_UTF-8' >> beam.Map(lambda x: x.decode('utf-8'))
| 'ExtractWords' >> beam.FlatMap(lambda x: re.findall(r'[A-Za-z\']+', x))
| 'CountingElem' >> beam.combiners.Count.PerElement()
| 'FormatOutput' >> beam.MapTuple(lambda word, count: '%s: %s' % (word, count))
| 'Printing2Log' >> beam.Map(lambda k: logging.info(k)))
result = p.run()
result.wait_until_finish()
问题是:我没有从管道的最后 3 个步骤获得任何输出,而我可以看到从管道的前 3 个步骤流出的数据 - 这意味着没有任何记录。
我期望这样的输出:
Peter: 2
Glen: 1
Alex: 1
Ryan: 2
我已经感谢你帮助我了
鉴于这是一个流式管道,您需要适当地设置 windowing/triggering 管道才能正常工作。见下文。
https://beam.apache.org/documentation/programming-guide/#windowing
更具体地说:
Caution: Beam’s default windowing behavior is to assign all elements
of a PCollection to a single, global window and discard late data,
even for unbounded PCollections. Before you use a grouping transform
such as GroupByKey on an unbounded PCollection, you must do at least
one of the following:
beam.combiners.Count.PerElement()
中包含一个 GroupByKey
。
我有一个 Pub/Sub 脚本发布男性名字如下:
from google.cloud import pubsub_v1
import names
project_id = "Your-Project-Name"
topic_name = "Your-Topic-Name"
publisher = pubsub_v1.PublisherClient()
topic_path = publisher.topic_path(project_id, topic_name)
while True:
data = names.get_first_name(gender='male') #u"Message number {}".format(n)
data = data.encode("utf-8")
publisher.publish(topic_path, data=data)
然后我有一个数据流,它从附加到主题的订阅中读取,然后按如下方式计算管道的每个元素:
import logging,re,os
import apache_beam as beam
from apache_beam.options.pipeline_options import PipelineOptions
root = logging.getLogger()
root.setLevel(logging.INFO)
p = beam.Pipeline(options=PipelineOptions())
x = (
p
| beam.io.ReadFromPubSub(topic=None, subscription="projects/YOUR-PROJECT-NAME/subscriptions/YOUR-SUBSCRIPTION-NAME").with_output_types(bytes)
| 'Decode_UTF-8' >> beam.Map(lambda x: x.decode('utf-8'))
| 'ExtractWords' >> beam.FlatMap(lambda x: re.findall(r'[A-Za-z\']+', x))
| 'CountingElem' >> beam.combiners.Count.PerElement()
| 'FormatOutput' >> beam.MapTuple(lambda word, count: '%s: %s' % (word, count))
| 'Printing2Log' >> beam.Map(lambda k: logging.info(k)))
result = p.run()
result.wait_until_finish()
问题是:我没有从管道的最后 3 个步骤获得任何输出,而我可以看到从管道的前 3 个步骤流出的数据 - 这意味着没有任何记录。
我期望这样的输出:
Peter: 2
Glen: 1
Alex: 1
Ryan: 2
我已经感谢你帮助我了
鉴于这是一个流式管道,您需要适当地设置 windowing/triggering 管道才能正常工作。见下文。 https://beam.apache.org/documentation/programming-guide/#windowing
更具体地说:
Caution: Beam’s default windowing behavior is to assign all elements of a PCollection to a single, global window and discard late data, even for unbounded PCollections. Before you use a grouping transform such as GroupByKey on an unbounded PCollection, you must do at least one of the following:
beam.combiners.Count.PerElement()
中包含一个 GroupByKey
。