从多个 pubsub 读取数据到同一个 bigquery
Reading data from multiple pubusb to same bigquery
这个问题与理解在 apache beam 中连接 gcp 管道的语法更相关。这是我当前管道的设置方式
options = dataflow_options(project_id=project_id, topic_name=topic_name, job_name=job_name)
p = apache_beam.Pipeline(options=options)
(p
| 'read pubusb' >> apache_beam.io.ReadFromPubSub(topic=topic_path, with_attributes=True)
| 'decode the message' >> apache_beam.ParDo(mydecoder())
| 'persist to db' >> apache_beam.io.WriteToBigQuery(
output_table,
create_disposition=apache_beam.io.BigQueryDisposition.CREATE_IF_NEEDED,
write_disposition=apache_beam.io.BigQueryDisposition.WRITE_APPEND
))
p.run()
有了这个,我可以创建一个看起来像这样的管道。
现在我真正想做的(假设我的解码器是相同的)是将多个 pubsub 连接到同一个解码器,即
我怎样才能在 apache beam 中实现这个
有几件事我忘了说
- 所有题目基本都是字节流
- 读topics时数据之间没有common key
- 每个主题都有不同的解码逻辑
我正在查看 CoGroupby
,但它需要一个通用密钥。
使用flatten将多个PCollection合并为一个:
# Flatten takes a tuple of PCollection objects.
# Returns a single PCollection that contains all of the elements in the PCollection objects in that tuple.
merged = (
(pcoll1, pcoll2, pcoll3)
# A list of tuples can be "piped" directly into a Flatten transform.
| beam.Flatten())
这个问题与理解在 apache beam 中连接 gcp 管道的语法更相关。这是我当前管道的设置方式
options = dataflow_options(project_id=project_id, topic_name=topic_name, job_name=job_name)
p = apache_beam.Pipeline(options=options)
(p
| 'read pubusb' >> apache_beam.io.ReadFromPubSub(topic=topic_path, with_attributes=True)
| 'decode the message' >> apache_beam.ParDo(mydecoder())
| 'persist to db' >> apache_beam.io.WriteToBigQuery(
output_table,
create_disposition=apache_beam.io.BigQueryDisposition.CREATE_IF_NEEDED,
write_disposition=apache_beam.io.BigQueryDisposition.WRITE_APPEND
))
p.run()
有了这个,我可以创建一个看起来像这样的管道。
现在我真正想做的(假设我的解码器是相同的)是将多个 pubsub 连接到同一个解码器,即
我怎样才能在 apache beam 中实现这个
有几件事我忘了说
- 所有题目基本都是字节流
- 读topics时数据之间没有common key
- 每个主题都有不同的解码逻辑
我正在查看 CoGroupby
,但它需要一个通用密钥。
使用flatten将多个PCollection合并为一个:
# Flatten takes a tuple of PCollection objects.
# Returns a single PCollection that contains all of the elements in the PCollection objects in that tuple.
merged = (
(pcoll1, pcoll2, pcoll3)
# A list of tuples can be "piped" directly into a Flatten transform.
| beam.Flatten())