Beam / DataFlow ::ReadFromPubSub(id_label) :: 意外行为

Beam / DataFlow ::ReadFromPubSub(id_label) :: Unexpected behavior

有人可以阐明 ReafFromPubSub transformid_label 参数的目的是什么吗?

我正在使用 BigQuery 接收器,据我所知,它就像一个用于 BQ Streaming API、Tabledata: insertAll

insertId

A unique ID for each row. BigQuery uses this property to detect duplicate insertion requests on a best-effort basis. For more information, see data consistency.

但是我没有看到这种预期的行为。

但仍在查询 BQ,所有消息都已插入。我预计,因为每条发布的消息都具有相同的 message_id 值,BQ 应该推断出那些...

有人可以澄清一下吗? 提前致谢!

此外,我注意到 DirectRunner 在使用此属性时不断抛出错误,

NotImplementedError: DirectRunner: id_label is not supported for PubSub reads

我必须使用 DataflowRunner...这也是预期的吗?

干杯!

更新 1:移至 DataflowRunner,管道似乎在 ReadFromPubSub() 期间遵守 id_label 参数。 但是,重复消息确实会继续偶尔读入管道

注意,我也在消息的属性中传递了相同的 message_id 值(='2')(这是为了尝试,测试推断行为)。

cid=141&message_id=2&evt_time=2019-03-17T09:31:15.792653Z
cid=141&message_id=2&evt_time=2019-03-17T09:30:00.767878Z
cid=141&message_id=2&evt_time=2019-03-17T09:28:30.747951Z
cid=141&message_id=2&evt_time=2019-03-17T09:22:30.668764Z
cid=141&message_id=2&evt_time=2019-03-17T09:21:00.646867Z
cid=141&message_id=2&evt_time=2019-03-17T09:19:45.630280Z
cid=141&message_id=2&evt_time=2019-03-17T09:12:05.466953Z
cid=141&message_id=2&evt_time=2019-03-17T09:10:42.956195Z
cid=141&message_id=2&evt_time=2019-03-17T09:01:42.816151Z

这些是不同的 ID。正如 here, every message published to a topic has a field named messageId that is guaranteed to be unique within the topic. Pub/Sub guarantees at-least-once delivery so a subscription can have duplicates (i.e. messages with the same messageId). Dataflow has exactly-once processing 语义所解释的那样,因为它在从订阅中读取时使用该字段来删除重复消息。这独立于接收器,不需要是 BigQuery。

使用id_label(或Java SDK 中的.withIdAttribute())我们可以根据本应唯一的不同字段(例如订单ID、客户 ID 等)。输入源只会读取重复的消息一次,您不会看到它们增加了管道中输入元素的数量。请记住,Direct Runner 适用于 testing purposes only and does not offer the same guarantees in terms of checkpointing, de-duplication, etc. As an example refer to this comment。这是您在管道中看到它们的最可能原因,同时考虑到 NotImplementedError 消息,因此我建议您转向 Dataflow Runner。

另一方面,insertId 用于 best-effort basis, to avoid duplicate rows when retrying streaming inserts in BigQuery. Using BigQueryIO it is created ,无法手动指定。在您的情况下,如果您的 N 条消息进入管道并且 N 条消息写入 BigQuery,则它按预期工作。如果必须重试,则该行具有相同的 insertId,因此被丢弃。