侧输入数据未更新 - Python Apache Beam
Side Input data doesn't get updated - Python Apache Beam
我正在构建一个包含动态配置数据的管道,只要被触发就会更新。
有 2 个 PubSub 主题,主题 A 用于 IoT 数据,主题 B 用于将用于转换 IoT 数据的配置。
配置保存在 Cloud Firestore 中。当数据库更新时,Cloud Function 将读取更新后的配置并将其发送到 PubSub 主题 B。
问题是 Dataflow 作业只在作业开始时读取配置数据,永远不会更新。
我怎样才能更新侧输入?
p = beam.Pipeline(options=options)
class Transform(beam.DoFn):
def process(self, configuration):
...
yield output
def run():
...
iot_data = (p
| 'ReadIotData' >> ReadFromPubSub(TOPIC_A)
configuration = (p
| 'ReadConfig' >> ReadFromPubSub(TOPIC_B)
| 'WindowUserData' >> beam.WindowInto(
window.GlobalWindows(),
trigger=trigger.Repeatedly(trigger.AfterCount(1)),
accumulation_mode=trigger.AccumulationMode.DISCARDING)
| 'JsonLoadsUserData' >> beam.Map(lambda x: ('data', x.decode().replace('\','')))
output = (iot_data
| 'transform' >> beam.ParDo(Transform(),
beam.pvalue.AsDict(configuration))
| 'Output' >> WriteToPubSub(TOPIC_C)
我会尝试 运行 这个管道 https://cloud.google.com/dataflow/docs/guides/deploying-a-pipeline#dataflow-runner-v2,它对侧输入有更好的支持。
我正在构建一个包含动态配置数据的管道,只要被触发就会更新。
有 2 个 PubSub 主题,主题 A 用于 IoT 数据,主题 B 用于将用于转换 IoT 数据的配置。
配置保存在 Cloud Firestore 中。当数据库更新时,Cloud Function 将读取更新后的配置并将其发送到 PubSub 主题 B。
问题是 Dataflow 作业只在作业开始时读取配置数据,永远不会更新。
我怎样才能更新侧输入?
p = beam.Pipeline(options=options)
class Transform(beam.DoFn):
def process(self, configuration):
...
yield output
def run():
...
iot_data = (p
| 'ReadIotData' >> ReadFromPubSub(TOPIC_A)
configuration = (p
| 'ReadConfig' >> ReadFromPubSub(TOPIC_B)
| 'WindowUserData' >> beam.WindowInto(
window.GlobalWindows(),
trigger=trigger.Repeatedly(trigger.AfterCount(1)),
accumulation_mode=trigger.AccumulationMode.DISCARDING)
| 'JsonLoadsUserData' >> beam.Map(lambda x: ('data', x.decode().replace('\','')))
output = (iot_data
| 'transform' >> beam.ParDo(Transform(),
beam.pvalue.AsDict(configuration))
| 'Output' >> WriteToPubSub(TOPIC_C)
我会尝试 运行 这个管道 https://cloud.google.com/dataflow/docs/guides/deploying-a-pipeline#dataflow-runner-v2,它对侧输入有更好的支持。