Beam / DataFlow ::ReadFromPubSub(id_label) :: 意外行为
Beam / DataFlow ::ReadFromPubSub(id_label) :: Unexpected behavior
有人可以阐明 ReafFromPubSub transform 中 id_label
参数的目的是什么吗?
我正在使用 BigQuery 接收器,据我所知,它就像一个用于 BQ Streaming API、Tabledata: insertAll
的 insertId
A unique ID for each row. BigQuery uses this property to detect duplicate insertion requests on a best-effort basis. For more information, see data consistency.
但是我没有看到这种预期的行为。
我正在向 Pub/Sub 发布消息,每条消息都具有相同的属性 message_id
值(这是为了测试管道/BQ 重复数据删除行为)
我的管道从 pubs 读取如下
beam.io.gcp.pubsub.ReadFromPubSub(topic=TOPIC,
subscription=None,
id_label='message_id'
)
但仍在查询 BQ,所有消息都已插入。我预计,因为每条发布的消息都具有相同的 message_id 值,BQ 应该推断出那些...
有人可以澄清一下吗?
提前致谢!
此外,我注意到 DirectRunner
在使用此属性时不断抛出错误,
NotImplementedError: DirectRunner: id_label is not supported for PubSub reads
我必须使用 DataflowRunner
...这也是预期的吗?
干杯!
更新 1:移至 DataflowRunner,管道似乎在 ReadFromPubSub() 期间遵守 id_label
参数。 但是,重复消息确实会继续偶尔读入管道。
我的发布者应用程序每 15 秒以以下格式发布消息 (the publisher app code is here):
cid=141&message_id=2&evt_time={{DATE_TIME_AT_RUNTIME}}
注意,我也在消息的属性中传递了相同的 message_id
值(='2')(这是为了尝试,测试推断行为)。
- 我的管道(运行 on Dataflow Runner,beam Python v2.11 SDK,pipeline code is here),将以下消息转储到 BQ。如您所见,具有相同
message_id
的多条消息被读入管道并发送到接收器。这通常发生在我 stop/restart 我的发布者应用程序时。
cid=141&message_id=2&evt_time=2019-03-17T09:31:15.792653Z
cid=141&message_id=2&evt_time=2019-03-17T09:30:00.767878Z
cid=141&message_id=2&evt_time=2019-03-17T09:28:30.747951Z
cid=141&message_id=2&evt_time=2019-03-17T09:22:30.668764Z
cid=141&message_id=2&evt_time=2019-03-17T09:21:00.646867Z
cid=141&message_id=2&evt_time=2019-03-17T09:19:45.630280Z
cid=141&message_id=2&evt_time=2019-03-17T09:12:05.466953Z
cid=141&message_id=2&evt_time=2019-03-17T09:10:42.956195Z
cid=141&message_id=2&evt_time=2019-03-17T09:01:42.816151Z
这些是不同的 ID。正如 here, every message published to a topic has a field named messageId
that is guaranteed to be unique within the topic. Pub/Sub guarantees at-least-once delivery so a subscription can have duplicates (i.e. messages with the same messageId
). Dataflow has exactly-once processing 语义所解释的那样,因为它在从订阅中读取时使用该字段来删除重复消息。这独立于接收器,不需要是 BigQuery。
使用id_label
(或Java SDK 中的.withIdAttribute()
)我们可以根据本应唯一的不同字段(例如订单ID、客户 ID 等)。输入源只会读取重复的消息一次,您不会看到它们增加了管道中输入元素的数量。请记住,Direct Runner 适用于 testing purposes only and does not offer the same guarantees in terms of checkpointing, de-duplication, etc. As an example refer to this comment。这是您在管道中看到它们的最可能原因,同时考虑到 NotImplementedError
消息,因此我建议您转向 Dataflow Runner。
另一方面,insertId
用于 best-effort basis, to avoid duplicate rows when retrying streaming inserts in BigQuery. Using BigQueryIO
it is created ,无法手动指定。在您的情况下,如果您的 N 条消息进入管道并且 N 条消息写入 BigQuery,则它按预期工作。如果必须重试,则该行具有相同的 insertId
,因此被丢弃。
有人可以阐明 ReafFromPubSub transform 中 id_label
参数的目的是什么吗?
我正在使用 BigQuery 接收器,据我所知,它就像一个用于 BQ Streaming API、Tabledata: insertAll
的insertId
A unique ID for each row. BigQuery uses this property to detect duplicate insertion requests on a best-effort basis. For more information, see data consistency.
但是我没有看到这种预期的行为。
我正在向 Pub/Sub 发布消息,每条消息都具有相同的属性
message_id
值(这是为了测试管道/BQ 重复数据删除行为)我的管道从 pubs 读取如下
beam.io.gcp.pubsub.ReadFromPubSub(topic=TOPIC, subscription=None, id_label='message_id'
)
但仍在查询 BQ,所有消息都已插入。我预计,因为每条发布的消息都具有相同的 message_id 值,BQ 应该推断出那些...
有人可以澄清一下吗? 提前致谢!
此外,我注意到 DirectRunner
在使用此属性时不断抛出错误,
NotImplementedError: DirectRunner: id_label is not supported for PubSub reads
我必须使用 DataflowRunner
...这也是预期的吗?
干杯!
更新 1:移至 DataflowRunner,管道似乎在 ReadFromPubSub() 期间遵守 id_label
参数。 但是,重复消息确实会继续偶尔读入管道。
我的发布者应用程序每 15 秒以以下格式发布消息 (the publisher app code is here):
cid=141&message_id=2&evt_time={{DATE_TIME_AT_RUNTIME}}
注意,我也在消息的属性中传递了相同的 message_id
值(='2')(这是为了尝试,测试推断行为)。
- 我的管道(运行 on Dataflow Runner,beam Python v2.11 SDK,pipeline code is here),将以下消息转储到 BQ。如您所见,具有相同
message_id
的多条消息被读入管道并发送到接收器。这通常发生在我 stop/restart 我的发布者应用程序时。
cid=141&message_id=2&evt_time=2019-03-17T09:31:15.792653Z
cid=141&message_id=2&evt_time=2019-03-17T09:30:00.767878Z
cid=141&message_id=2&evt_time=2019-03-17T09:28:30.747951Z
cid=141&message_id=2&evt_time=2019-03-17T09:22:30.668764Z
cid=141&message_id=2&evt_time=2019-03-17T09:21:00.646867Z
cid=141&message_id=2&evt_time=2019-03-17T09:19:45.630280Z
cid=141&message_id=2&evt_time=2019-03-17T09:12:05.466953Z
cid=141&message_id=2&evt_time=2019-03-17T09:10:42.956195Z
cid=141&message_id=2&evt_time=2019-03-17T09:01:42.816151Z
这些是不同的 ID。正如 here, every message published to a topic has a field named messageId
that is guaranteed to be unique within the topic. Pub/Sub guarantees at-least-once delivery so a subscription can have duplicates (i.e. messages with the same messageId
). Dataflow has exactly-once processing 语义所解释的那样,因为它在从订阅中读取时使用该字段来删除重复消息。这独立于接收器,不需要是 BigQuery。
使用id_label
(或Java SDK 中的.withIdAttribute()
)我们可以根据本应唯一的不同字段(例如订单ID、客户 ID 等)。输入源只会读取重复的消息一次,您不会看到它们增加了管道中输入元素的数量。请记住,Direct Runner 适用于 testing purposes only and does not offer the same guarantees in terms of checkpointing, de-duplication, etc. As an example refer to this comment。这是您在管道中看到它们的最可能原因,同时考虑到 NotImplementedError
消息,因此我建议您转向 Dataflow Runner。
另一方面,insertId
用于 best-effort basis, to avoid duplicate rows when retrying streaming inserts in BigQuery. Using BigQueryIO
it is created insertId
,因此被丢弃。