订单 Google 云 Pub/Sub 消息 - java 示例程序
Order Google Cloud Pub/Sub messages - java sample program
我正在尝试编写一个简单的消费者 java 程序,它使用来自 Google 云 Pub/Sub 的消息并对消息进行重复数据删除和排序。
我没能找到执行此操作的简单示例程序。
我已阅读 google 文档,他们建议用户使用 Apache Beam。但是我不熟悉 Apache Beam,我想要一个基本的示例程序来演示此功能。简单地获取比较器并知道删除重复消息并给出按属性排序的消息的东西。
有人可以提供这样的示例 java 程序吗?
如果什么都不存在,那是因为 "really" 不可能。
1、问"When Pubsub generate double values?"很有用。仅当消息已发送但未收到确认(或未在预期时间范围内发出,默认为 10 秒),或在推送模式下未收到 HTTP 200。
第二:什么是Beam? Beam 是一个流水线引擎。您可以将您的 PubSub 插入其中,然后
您的管道将读取消息并对它们进行重复数据删除。请注意,Beam 在 windows 的 10 到 20 分钟内执行了此重复数据删除。
3rd:"ordered"是什么意思?查看您的消息 ID。该值是一个时间戳,以微秒为单位(这就是 PubSub 每秒可以摄取多达 1M 消息的原因)。有序消息意味着有一条消息用于顺序 ID,否则放入缓冲区并等待填充空白。当然,差距永远不会被填补...
回到 Beam。 Beam 有能力定义 windows of observation。顺便说一句,你可以定义,例如,滑动windows 5分钟,每个windows每分钟开始。当 window 关闭时,将触发 PCollection 消息并将其处理到您的管道中。在这个有限的集合上,您可以订购您的信息。
同理,您也可以手动删除本合集中的depublicates。
最后一个信息,PubSub 是 backbone 的 Google 服务。它发展缓慢,因为它很关键。但是,也许你的需求会在一天内发布!
云 Pub/Sub 现在 supports ordered delivery. The feature is GA as of October 2020. To order messages, you set the enable_ordered_delivery
property on a subscription to true
and you set the ordering_key
property on messages you want ordered (Java sample)。具有相同排序键的所有消息都按照服务接收它们的顺序传递给订阅者。请注意,Dataflow 尚不能利用此功能。
重复数据删除仍需由客户端完成,但使用有序传递应该更容易,因为您可以更轻松地跟踪哪些消息已传递。如果您不能容忍重复,那么您可能需要永久存储您已处理的消息列表(或您最近处理的消息),以便检测重复消息并丢弃它们。
实现 MessageReceiver
class 的简单 class 仅执行 in-memory 重复数据删除可能看起来像这样:
public class DedupingSubscriber implements MessageReceiver {
ConcurrentMap<String, Long> mostRecentPerKey = new ConcurrentHashMap<>();
@Override
void receiveMessage(PubsubMessage message, AckReplyConsumer consumer) {
Long keyTime = mostRecentPerKey.get(message.getOrderingKey());
Long messageTime = Timestamps.toNanos(message.getPublishTime());
if (keyTime != null && messageTime.compareTo(keyTime) > 0) {
// This message has not been processed.
// processMessage(message); // Do what needs to be done with message.
mostRecentPerKey.put(message.getOrderingKey(), messageTime);
}
consumer.ack();
}
}
我正在尝试编写一个简单的消费者 java 程序,它使用来自 Google 云 Pub/Sub 的消息并对消息进行重复数据删除和排序。
我没能找到执行此操作的简单示例程序。 我已阅读 google 文档,他们建议用户使用 Apache Beam。但是我不熟悉 Apache Beam,我想要一个基本的示例程序来演示此功能。简单地获取比较器并知道删除重复消息并给出按属性排序的消息的东西。
有人可以提供这样的示例 java 程序吗?
如果什么都不存在,那是因为 "really" 不可能。
1、问"When Pubsub generate double values?"很有用。仅当消息已发送但未收到确认(或未在预期时间范围内发出,默认为 10 秒),或在推送模式下未收到 HTTP 200。
第二:什么是Beam? Beam 是一个流水线引擎。您可以将您的 PubSub 插入其中,然后 您的管道将读取消息并对它们进行重复数据删除。请注意,Beam 在 windows 的 10 到 20 分钟内执行了此重复数据删除。
3rd:"ordered"是什么意思?查看您的消息 ID。该值是一个时间戳,以微秒为单位(这就是 PubSub 每秒可以摄取多达 1M 消息的原因)。有序消息意味着有一条消息用于顺序 ID,否则放入缓冲区并等待填充空白。当然,差距永远不会被填补...
回到 Beam。 Beam 有能力定义 windows of observation。顺便说一句,你可以定义,例如,滑动windows 5分钟,每个windows每分钟开始。当 window 关闭时,将触发 PCollection 消息并将其处理到您的管道中。在这个有限的集合上,您可以订购您的信息。
同理,您也可以手动删除本合集中的depublicates。
最后一个信息,PubSub 是 backbone 的 Google 服务。它发展缓慢,因为它很关键。但是,也许你的需求会在一天内发布!
云 Pub/Sub 现在 supports ordered delivery. The feature is GA as of October 2020. To order messages, you set the enable_ordered_delivery
property on a subscription to true
and you set the ordering_key
property on messages you want ordered (Java sample)。具有相同排序键的所有消息都按照服务接收它们的顺序传递给订阅者。请注意,Dataflow 尚不能利用此功能。
重复数据删除仍需由客户端完成,但使用有序传递应该更容易,因为您可以更轻松地跟踪哪些消息已传递。如果您不能容忍重复,那么您可能需要永久存储您已处理的消息列表(或您最近处理的消息),以便检测重复消息并丢弃它们。
实现 MessageReceiver
class 的简单 class 仅执行 in-memory 重复数据删除可能看起来像这样:
public class DedupingSubscriber implements MessageReceiver {
ConcurrentMap<String, Long> mostRecentPerKey = new ConcurrentHashMap<>();
@Override
void receiveMessage(PubsubMessage message, AckReplyConsumer consumer) {
Long keyTime = mostRecentPerKey.get(message.getOrderingKey());
Long messageTime = Timestamps.toNanos(message.getPublishTime());
if (keyTime != null && messageTime.compareTo(keyTime) > 0) {
// This message has not been processed.
// processMessage(message); // Do what needs to be done with message.
mostRecentPerKey.put(message.getOrderingKey(), messageTime);
}
consumer.ack();
}
}