Google Cloud DataFlow PubSubIO 未读取完整主题
Google Cloud DataFlow PubSubIO doesn't read from a full topic
我正在尝试 运行 Google Cloud DataFlow 中的管道,"Streaming" 模式。
管道应该从 PubSub 主题读取,但它实际上并没有从主题读取,直到我删除它,重新创建它并在管道启动后将所有消息重新发布到主题。
有没有办法让管道读取已经发布的消息?
听起来好像提供 Pub/Sub 订阅(Pub/Sub I/O documentation 中有更多详细信息)可以解决您的问题。订阅创建后消息将被缓冲,允许在管道启动时读取这些消息。
请使用云控制台在发布订阅中创建自定义订阅。
在代码中尝试这样的事情。
PCollection<TableRow> datastream = p.apply(PubsubIO.Read.named("Read device iot data from PubSub")
.subscription(String.format("projects/%s/subscriptions/%s",<ProjectId>,<Subscriptionname>))
.timestampLabel("ts")
.withCoder(TableRowJsonCoder.of()));
请注意,当您订阅时,您可以订阅主题或订阅名称。
在上面的代码中,我订阅了我在发布子控制台中明确创建的订阅。
进行显式订阅的好处是,它存储从 pub sub 拉取的数据,即使你的数据流代码是 offline.So 数据也不会丢失。
我正在尝试 运行 Google Cloud DataFlow 中的管道,"Streaming" 模式。 管道应该从 PubSub 主题读取,但它实际上并没有从主题读取,直到我删除它,重新创建它并在管道启动后将所有消息重新发布到主题。
有没有办法让管道读取已经发布的消息?
听起来好像提供 Pub/Sub 订阅(Pub/Sub I/O documentation 中有更多详细信息)可以解决您的问题。订阅创建后消息将被缓冲,允许在管道启动时读取这些消息。
请使用云控制台在发布订阅中创建自定义订阅。 在代码中尝试这样的事情。
PCollection<TableRow> datastream = p.apply(PubsubIO.Read.named("Read device iot data from PubSub")
.subscription(String.format("projects/%s/subscriptions/%s",<ProjectId>,<Subscriptionname>))
.timestampLabel("ts")
.withCoder(TableRowJsonCoder.of()));
请注意,当您订阅时,您可以订阅主题或订阅名称。
在上面的代码中,我订阅了我在发布子控制台中明确创建的订阅。 进行显式订阅的好处是,它存储从 pub sub 拉取的数据,即使你的数据流代码是 offline.So 数据也不会丢失。