Google PubSub 线程安全和忽略重复消息
Google PubSub thread safety and ignoring duplicate messages
我正在收听Google云平台发布的财经数据,周一至周五。我想将所有消息保存到磁盘。我在 Python.
中这样做
如果我的应用程序出现故障,我需要恢复所有丢失的数据包。我知道 Google 会自动重新发送未确认的消息。
GCP 文档列出了许多可用的订阅技术(Asynchronous/Synchronous、Push/Pull、Streaming pull 等)。有异步示例代码:
def callback(message):
print(f"Received {message}.")
message.ack()
streaming_pull_future = subscriber.subscribe(subscription_path, callback=callback)
print(f"Listening for messages on {subscription_path}..\n")
# Wrap subscriber in a 'with' block to automatically call close() when done.
with subscriber:
try:
# When `timeout` is not set, result() will block indefinitely,
# unless an exception is encountered first.
streaming_pull_future.result(timeout=5)
except TimeoutError:
streaming_pull_future.cancel()
https://cloud.google.com/pubsub/docs/pull
- 回调thread-safe/is是否只有一个线程回调?
- 忽略已收到消息的最佳方法是什么?客户需要维护地图吗?
Kamal Aboul-Hosn 更新
我想我可以坚持,但我的问题是我需要手动检查是否确实收到了所有消息。为此,我启用了订购送货。我们的消息数据包含一个序列号,所以我想添加一个全局变量,如 next_expected_seq_num
。收到每条消息后,我将处理并确认消息并递增 next_expected_seq_num
.
但是,如果我说有 10 个线程调用回调方法,我假设这 10 个线程中的任何一个都可以包含下一条消息?而且我必须使我的回调方法足够智能,以阻止其他 9 个线程的处理,而第 10 个线程处理下一条消息。类似于:
(伪代码)
def callback(msg)
{
seq_num = getSeqNum(msg.data);
while(seq_num != next_expected_seq_num); // Make atomic
// When we reach here, we have the next message
assert(db.exists(seq_num) == false);
// persist message
++next_expected_seq_num; // make atomic/cannot be earlier
msg.ack();
}
既然我要阻止多线程,我是否应该只禁用多个回调线程?
有没有更好的方法来check/guarantee我们处理每条消息?
我想知道我们是否应该像 TCP 一样信任 GCP,启用多线程(并只锁定数据库写入)?
def callback(msg)
{
seq_num = getSeqNum(msg.data);
lock();
if(db.exists(seq_num) == false)
{
// persist message
}
unlock();
msg.ack();
}
如果您 运行 处于没有全局解释器锁的 Python 环境中,则回调不是线程安全的。在这种情况下可以并行执行多个回调,您必须使用锁来保护任何共享数据结构。
由于云 Pub/Sub 具有至少一次传递语义,如果您需要忽略重复的消息,那么是的,您将需要使用已接收的消息维护某种数据结构。请注意,重复项可能会在订阅者重新启动时传送。因此,您可能需要将其作为某种持久存储。 Redis 往往是此类重复数据删除的流行选择。
通过有序传递,保证回调一次只会 运行 一个排序键的一条消息。因此,您不必编写期望多条消息同时为密钥 运行ning 的程序。请注意,通常情况下,使用排序键对主题中的所有消息进行完全排序只有在您的吞吐量不超过 1MB/s 时才有效,因为这是具有排序键的消息的发布限制。此外,只有在按顺序处理消息很重要的情况下才使用排序键。
关于何时使用多线程,这实际上取决于处理的性质。如果大部分回调都需要用锁来保护,那么多线程就无济于事了。但是,如果只有一小部分需要锁保护,例如检查重复项,而大部分处理可以安全地并行完成,那么多线程可以带来更好的性能。
如果您只想防止重复,那么您可能不需要使用锁来保护对数据库的写入,除非数据库不保证一致性。另外,请记住,锁定只有在您只有一个订户客户端时才有用。
我正在收听Google云平台发布的财经数据,周一至周五。我想将所有消息保存到磁盘。我在 Python.
中这样做如果我的应用程序出现故障,我需要恢复所有丢失的数据包。我知道 Google 会自动重新发送未确认的消息。
GCP 文档列出了许多可用的订阅技术(Asynchronous/Synchronous、Push/Pull、Streaming pull 等)。有异步示例代码:
def callback(message):
print(f"Received {message}.")
message.ack()
streaming_pull_future = subscriber.subscribe(subscription_path, callback=callback)
print(f"Listening for messages on {subscription_path}..\n")
# Wrap subscriber in a 'with' block to automatically call close() when done.
with subscriber:
try:
# When `timeout` is not set, result() will block indefinitely,
# unless an exception is encountered first.
streaming_pull_future.result(timeout=5)
except TimeoutError:
streaming_pull_future.cancel()
https://cloud.google.com/pubsub/docs/pull
- 回调thread-safe/is是否只有一个线程回调?
- 忽略已收到消息的最佳方法是什么?客户需要维护地图吗?
Kamal Aboul-Hosn 更新
我想我可以坚持,但我的问题是我需要手动检查是否确实收到了所有消息。为此,我启用了订购送货。我们的消息数据包含一个序列号,所以我想添加一个全局变量,如 next_expected_seq_num
。收到每条消息后,我将处理并确认消息并递增 next_expected_seq_num
.
但是,如果我说有 10 个线程调用回调方法,我假设这 10 个线程中的任何一个都可以包含下一条消息?而且我必须使我的回调方法足够智能,以阻止其他 9 个线程的处理,而第 10 个线程处理下一条消息。类似于:
(伪代码)
def callback(msg)
{
seq_num = getSeqNum(msg.data);
while(seq_num != next_expected_seq_num); // Make atomic
// When we reach here, we have the next message
assert(db.exists(seq_num) == false);
// persist message
++next_expected_seq_num; // make atomic/cannot be earlier
msg.ack();
}
既然我要阻止多线程,我是否应该只禁用多个回调线程?
有没有更好的方法来check/guarantee我们处理每条消息?
我想知道我们是否应该像 TCP 一样信任 GCP,启用多线程(并只锁定数据库写入)?
def callback(msg)
{
seq_num = getSeqNum(msg.data);
lock();
if(db.exists(seq_num) == false)
{
// persist message
}
unlock();
msg.ack();
}
如果您 运行 处于没有全局解释器锁的 Python 环境中,则回调不是线程安全的。在这种情况下可以并行执行多个回调,您必须使用锁来保护任何共享数据结构。
由于云 Pub/Sub 具有至少一次传递语义,如果您需要忽略重复的消息,那么是的,您将需要使用已接收的消息维护某种数据结构。请注意,重复项可能会在订阅者重新启动时传送。因此,您可能需要将其作为某种持久存储。 Redis 往往是此类重复数据删除的流行选择。
通过有序传递,保证回调一次只会 运行 一个排序键的一条消息。因此,您不必编写期望多条消息同时为密钥 运行ning 的程序。请注意,通常情况下,使用排序键对主题中的所有消息进行完全排序只有在您的吞吐量不超过 1MB/s 时才有效,因为这是具有排序键的消息的发布限制。此外,只有在按顺序处理消息很重要的情况下才使用排序键。
关于何时使用多线程,这实际上取决于处理的性质。如果大部分回调都需要用锁来保护,那么多线程就无济于事了。但是,如果只有一小部分需要锁保护,例如检查重复项,而大部分处理可以安全地并行完成,那么多线程可以带来更好的性能。
如果您只想防止重复,那么您可能不需要使用锁来保护对数据库的写入,除非数据库不保证一致性。另外,请记住,锁定只有在您只有一个订户客户端时才有用。