确认后 GCP 消息停留在 Pub/Sub
GCP message stays in the Pub/Sub after acknowledge
我有 Pub/Sub 订阅逻辑封装在一个订阅方法中,该方法在每个订阅的服务初始化期间被调用一次:
def subscribe(self,
callback: typing.Callable,
subscription_name: str,
topic_name: str,
project_name: str = None) -> typing.Optional[SubscriberClient]:
"""Subscribes to Pub/Sub topic and return subscriber client
:param callback: subscription callback method
:param subscription_name: name of the subscription
:param topic_name: name of the topic
:param project_name: optional project name. Uses default project if not set
:return: subscriber client or None if testing
"""
project = project_name if project_name else self.pubsub_project_id
self.logger.info('Subscribing to project `{}`, topic `{}`'.format(project, topic_name))
project_path = self.pubsub_subscriber.project_path(project)
topic_path = self.pubsub_subscriber.topic_path(project, topic_name)
subscription_path = self.pubsub_subscriber.subscription_path(project, subscription_name)
# check if there is an existing subscription, if not, create it
if subscription_path not in [s.name for s in self.pubsub_subscriber.list_subscriptions(project_path)]:
self.logger.info('Creating new subscription `{}`, topic `{}`'.format(subscription_name, topic_name))
self.pubsub_subscriber.create_subscription(subscription_path, topic_path)
# subscribe to the topic
self.pubsub_subscriber.subscribe(
subscription_path, callback=callback,
scheduler=self.thread_scheduler
)
return self.pubsub_subscriber
这个方法是这样调用的:
self.subscribe_client = self.subscribe(
callback=self.pubsub_callback,
subscription_name='subscription_topic',
topic_name='topic'
)
回调方法做了很多事情,发送了 2 封电子邮件然后确认消息
def pubsub_callback(self, data: gcloud_pubsub_subscriber.Message):
self.logger.debug('Processing pub sub message')
try:
self.do_something_with_message(data)
self.logger.debug('Acknowledging the message')
data.ack()
self.logger.debug('Acknowledged')
return
except:
self.logger.warning({
"message": "Failed to process Pub/Sub message",
"request_size": data.size,
"data": data.data
}, exc_info=True)
self.logger.debug('Acknowledging the message 2')
data.ack()
当我 运行 向订阅推送内容时,回调 运行s,打印所有调试消息,包括 Acknowledged
。但是,该消息保留在 Pub/Sub 中,回调会再次调用,并且每次重试后都需要指数时间。问题是即使在 ack
被调用后,消息仍保留在 pub/sub 中的原因是什么?
我有几个这样的订阅,它们都按预期工作。截止日期不是一个选项,回调几乎立即完成,无论如何我都玩了 ack 截止日期,没有任何帮助。
当我尝试处理来自连接到该 pub-sub 的本地 运行ning 应用程序的这些消息时,它完成得很好并且确认将消息按预期从队列中取出。
- 所以问题只出现在已部署的服务中(运行在 kubernetes pod 中)
- 回调执行 buck ack 似乎什么都不做
- 从脚本 运行 在本地(...并做完全相同的事情)或通过 GCP UI 确认消息按预期工作。
有什么想法吗?
致谢在 Pub/Sub 中是尽力而为的,因此重新传递消息是可能的,但不常见。
如果您一直收到重复邮件,可能是因为重复发布了相同的邮件内容。就Pub/Sub而言,这些是不同的消息,将被分配不同的消息ID。检查 Pub/Sub 提供的消息 ID,以确保您实际上多次收到同一条消息。
有 an edge case in dealing with large backlogs of small messages with streaming pull(Python 客户端库使用的)。如果您是 运行 多个订阅同一订阅的客户,则此边缘案例可能是相关的。
您还可以查看您的订阅 Stackdriver metrics 以查看:
- 如果它的 ack 发送成功 (
subscription/ack_message_count
)
- 如果其积压正在减少 (
subscription/backlog_bytes
)
- 如果您的订阅者错过确认截止日期(
subscription/streaming_pull_ack_message_operation_count
由 response_code != "success"
过滤)
如果您没有错过确认截止日期并且您的积压工作保持稳定,您应该联系 Google 云支持并提供您的项目名称、订阅名称和重复消息 ID 的示例。他们将能够调查为什么会发生这些重复。
我做了一些额外的测试,终于找到了问题所在。
TL;DR:我对所有订阅使用相同的 google.cloud.pubsub_v1.subscriber.scheduler.ThreadScheduler
。
以下是我用来测试它的代码片段。这是损坏的版本:
server.py
import concurrent.futures.thread
import os
import time
from google.api_core.exceptions import AlreadyExists
from google.cloud import pubsub_v1
from google.cloud.pubsub_v1.subscriber.scheduler import ThreadScheduler
def create_subscription(project_id, topic_name, subscription_name):
"""Create a new pull subscription on the given topic."""
subscriber = pubsub_v1.SubscriberClient()
topic_path = subscriber.topic_path(project_id, topic_name)
subscription_path = subscriber.subscription_path(
project_id, subscription_name)
subscription = subscriber.create_subscription(
subscription_path, topic_path)
print('Subscription created: {}'.format(subscription))
def receive_messages(project_id, subscription_name, t_scheduler):
"""Receives messages from a pull subscription."""
subscriber = pubsub_v1.SubscriberClient()
subscription_path = subscriber.subscription_path(
project_id, subscription_name)
def callback(message):
print('Received message: {}'.format(message.data))
message.ack()
subscriber.subscribe(subscription_path, callback=callback, scheduler=t_scheduler)
print('Listening for messages on {}'.format(subscription_path))
project_id = os.getenv("PUBSUB_PROJECT_ID")
publisher = pubsub_v1.PublisherClient()
project_path = publisher.project_path(project_id)
# Create both topics
try:
topics = [topic.name.split('/')[-1] for topic in publisher.list_topics(project_path)]
if 'topic_a' not in topics:
publisher.create_topic(publisher.topic_path(project_id, 'topic_a'))
if 'topic_b' not in topics:
publisher.create_topic(publisher.topic_path(project_id, 'topic_b'))
except AlreadyExists:
print('Topics already exists')
# Create subscriptions on both topics
sub_client = pubsub_v1.SubscriberClient()
project_path = sub_client.project_path(project_id)
try:
subs = [sub.name.split('/')[-1] for sub in sub_client.list_subscriptions(project_path)]
if 'topic_a_sub' not in subs:
create_subscription(project_id, 'topic_a', 'topic_a_sub')
if 'topic_b_sub' not in subs:
create_subscription(project_id, 'topic_b', 'topic_b_sub')
except AlreadyExists:
print('Subscriptions already exists')
scheduler = ThreadScheduler(concurrent.futures.thread.ThreadPoolExecutor(10))
receive_messages(project_id, 'topic_a_sub', scheduler)
receive_messages(project_id, 'topic_b_sub', scheduler)
while True:
time.sleep(60)
client.py
import datetime
import os
import random
import sys
from time import sleep
from google.cloud import pubsub_v1
def publish_messages(pid, topic_name):
"""Publishes multiple messages to a Pub/Sub topic."""
publisher = pubsub_v1.PublisherClient()
topic_path = publisher.topic_path(pid, topic_name)
for n in range(1, 10):
data = '[{} - {}] Message number {}'.format(datetime.datetime.now().isoformat(), topic_name, n)
data = data.encode('utf-8')
publisher.publish(topic_path, data=data)
sleep(random.randint(10, 50) / 10.0)
project_id = os.getenv("PUBSUB_PROJECT_ID")
publish_messages(project_id, sys.argv[1])
我连接到云 pub/sub,服务器创建了主题和订阅。然后我 运行 客户端脚本针对两个主题并行多次。片刻之后,一旦我更改服务器代码以在 receive_messages
范围内实例化新的线程调度程序,服务器就会清理两个主题并按预期运行。
令人困惑的是,无论哪种情况,服务器都会打印出所有消息的接收消息。
我要post这个到https://github.com/googleapis/google-cloud-python/issues
我有 Pub/Sub 订阅逻辑封装在一个订阅方法中,该方法在每个订阅的服务初始化期间被调用一次:
def subscribe(self,
callback: typing.Callable,
subscription_name: str,
topic_name: str,
project_name: str = None) -> typing.Optional[SubscriberClient]:
"""Subscribes to Pub/Sub topic and return subscriber client
:param callback: subscription callback method
:param subscription_name: name of the subscription
:param topic_name: name of the topic
:param project_name: optional project name. Uses default project if not set
:return: subscriber client or None if testing
"""
project = project_name if project_name else self.pubsub_project_id
self.logger.info('Subscribing to project `{}`, topic `{}`'.format(project, topic_name))
project_path = self.pubsub_subscriber.project_path(project)
topic_path = self.pubsub_subscriber.topic_path(project, topic_name)
subscription_path = self.pubsub_subscriber.subscription_path(project, subscription_name)
# check if there is an existing subscription, if not, create it
if subscription_path not in [s.name for s in self.pubsub_subscriber.list_subscriptions(project_path)]:
self.logger.info('Creating new subscription `{}`, topic `{}`'.format(subscription_name, topic_name))
self.pubsub_subscriber.create_subscription(subscription_path, topic_path)
# subscribe to the topic
self.pubsub_subscriber.subscribe(
subscription_path, callback=callback,
scheduler=self.thread_scheduler
)
return self.pubsub_subscriber
这个方法是这样调用的:
self.subscribe_client = self.subscribe(
callback=self.pubsub_callback,
subscription_name='subscription_topic',
topic_name='topic'
)
回调方法做了很多事情,发送了 2 封电子邮件然后确认消息
def pubsub_callback(self, data: gcloud_pubsub_subscriber.Message):
self.logger.debug('Processing pub sub message')
try:
self.do_something_with_message(data)
self.logger.debug('Acknowledging the message')
data.ack()
self.logger.debug('Acknowledged')
return
except:
self.logger.warning({
"message": "Failed to process Pub/Sub message",
"request_size": data.size,
"data": data.data
}, exc_info=True)
self.logger.debug('Acknowledging the message 2')
data.ack()
当我 运行 向订阅推送内容时,回调 运行s,打印所有调试消息,包括 Acknowledged
。但是,该消息保留在 Pub/Sub 中,回调会再次调用,并且每次重试后都需要指数时间。问题是即使在 ack
被调用后,消息仍保留在 pub/sub 中的原因是什么?
我有几个这样的订阅,它们都按预期工作。截止日期不是一个选项,回调几乎立即完成,无论如何我都玩了 ack 截止日期,没有任何帮助。
当我尝试处理来自连接到该 pub-sub 的本地 运行ning 应用程序的这些消息时,它完成得很好并且确认将消息按预期从队列中取出。
- 所以问题只出现在已部署的服务中(运行在 kubernetes pod 中)
- 回调执行 buck ack 似乎什么都不做
- 从脚本 运行 在本地(...并做完全相同的事情)或通过 GCP UI 确认消息按预期工作。
有什么想法吗?
致谢在 Pub/Sub 中是尽力而为的,因此重新传递消息是可能的,但不常见。
如果您一直收到重复邮件,可能是因为重复发布了相同的邮件内容。就Pub/Sub而言,这些是不同的消息,将被分配不同的消息ID。检查 Pub/Sub 提供的消息 ID,以确保您实际上多次收到同一条消息。
有 an edge case in dealing with large backlogs of small messages with streaming pull(Python 客户端库使用的)。如果您是 运行 多个订阅同一订阅的客户,则此边缘案例可能是相关的。
您还可以查看您的订阅 Stackdriver metrics 以查看:
- 如果它的 ack 发送成功 (
subscription/ack_message_count
) - 如果其积压正在减少 (
subscription/backlog_bytes
) - 如果您的订阅者错过确认截止日期(
subscription/streaming_pull_ack_message_operation_count
由response_code != "success"
过滤)
如果您没有错过确认截止日期并且您的积压工作保持稳定,您应该联系 Google 云支持并提供您的项目名称、订阅名称和重复消息 ID 的示例。他们将能够调查为什么会发生这些重复。
我做了一些额外的测试,终于找到了问题所在。
TL;DR:我对所有订阅使用相同的 google.cloud.pubsub_v1.subscriber.scheduler.ThreadScheduler
。
以下是我用来测试它的代码片段。这是损坏的版本:
server.py
import concurrent.futures.thread
import os
import time
from google.api_core.exceptions import AlreadyExists
from google.cloud import pubsub_v1
from google.cloud.pubsub_v1.subscriber.scheduler import ThreadScheduler
def create_subscription(project_id, topic_name, subscription_name):
"""Create a new pull subscription on the given topic."""
subscriber = pubsub_v1.SubscriberClient()
topic_path = subscriber.topic_path(project_id, topic_name)
subscription_path = subscriber.subscription_path(
project_id, subscription_name)
subscription = subscriber.create_subscription(
subscription_path, topic_path)
print('Subscription created: {}'.format(subscription))
def receive_messages(project_id, subscription_name, t_scheduler):
"""Receives messages from a pull subscription."""
subscriber = pubsub_v1.SubscriberClient()
subscription_path = subscriber.subscription_path(
project_id, subscription_name)
def callback(message):
print('Received message: {}'.format(message.data))
message.ack()
subscriber.subscribe(subscription_path, callback=callback, scheduler=t_scheduler)
print('Listening for messages on {}'.format(subscription_path))
project_id = os.getenv("PUBSUB_PROJECT_ID")
publisher = pubsub_v1.PublisherClient()
project_path = publisher.project_path(project_id)
# Create both topics
try:
topics = [topic.name.split('/')[-1] for topic in publisher.list_topics(project_path)]
if 'topic_a' not in topics:
publisher.create_topic(publisher.topic_path(project_id, 'topic_a'))
if 'topic_b' not in topics:
publisher.create_topic(publisher.topic_path(project_id, 'topic_b'))
except AlreadyExists:
print('Topics already exists')
# Create subscriptions on both topics
sub_client = pubsub_v1.SubscriberClient()
project_path = sub_client.project_path(project_id)
try:
subs = [sub.name.split('/')[-1] for sub in sub_client.list_subscriptions(project_path)]
if 'topic_a_sub' not in subs:
create_subscription(project_id, 'topic_a', 'topic_a_sub')
if 'topic_b_sub' not in subs:
create_subscription(project_id, 'topic_b', 'topic_b_sub')
except AlreadyExists:
print('Subscriptions already exists')
scheduler = ThreadScheduler(concurrent.futures.thread.ThreadPoolExecutor(10))
receive_messages(project_id, 'topic_a_sub', scheduler)
receive_messages(project_id, 'topic_b_sub', scheduler)
while True:
time.sleep(60)
client.py
import datetime
import os
import random
import sys
from time import sleep
from google.cloud import pubsub_v1
def publish_messages(pid, topic_name):
"""Publishes multiple messages to a Pub/Sub topic."""
publisher = pubsub_v1.PublisherClient()
topic_path = publisher.topic_path(pid, topic_name)
for n in range(1, 10):
data = '[{} - {}] Message number {}'.format(datetime.datetime.now().isoformat(), topic_name, n)
data = data.encode('utf-8')
publisher.publish(topic_path, data=data)
sleep(random.randint(10, 50) / 10.0)
project_id = os.getenv("PUBSUB_PROJECT_ID")
publish_messages(project_id, sys.argv[1])
我连接到云 pub/sub,服务器创建了主题和订阅。然后我 运行 客户端脚本针对两个主题并行多次。片刻之后,一旦我更改服务器代码以在 receive_messages
范围内实例化新的线程调度程序,服务器就会清理两个主题并按预期运行。
令人困惑的是,无论哪种情况,服务器都会打印出所有消息的接收消息。
我要post这个到https://github.com/googleapis/google-cloud-python/issues