使用无限循环不断轮询 SQS 队列
Constantly polling SQS Queue using infinite loop
我有一个 SQS 队列,我需要不断监视传入消息。一旦消息到达,我会做一些处理并继续等待下一条消息。我通过设置一个在循环结束时暂停 2 秒的无限循环来实现这一点。这行得通,但我不禁觉得这不是解决不断排队的需要的非常有效的方法。
代码示例:
while (1):
response = sqs.receive_message(
QueueUrl=queue_url,
AttributeNames=[
'SentTimestamp'
],
MaxNumberOfMessages=1,
MessageAttributeNames=[
'All'
],
VisibilityTimeout=1,
WaitTimeSeconds=1
)
try:
message = response['Messages'][0]
receipt_handle = message['ReceiptHandle']
# Delete received message from queue
sqs.delete_message(
QueueUrl=queue_url,
ReceiptHandle=receipt_handle
)
msg = message['Body']
msg_json = eval(msg)
value1 = msg_json['value1']
value2 = msg_json['value2']
process(value1, value2)
except:
pass
#print('Queue empty')
time.sleep(2)
为了干净地退出脚本(应该 运行 持续),我捕获了在 Ctrl+C 上触发的 KeyboardInterrupt 并执行一些清理例程以正常退出。
if __name__ == '__main__':
try:
main()
except KeyboardInterrupt:
logout()
有没有更好的办法实现SQS队列的不断轮询,2秒的延迟是否有必要?我尽量不去敲打 SQS 服务,但也许这无关紧要?
这就是 SQS 的最终工作方式 - 它需要一些东西来轮询它以获取消息。但一些建议:
不要每次只收到一条消息。做更多类似的事情:
messages = sqs.receive_messages(
MessageAttributeNames=['All'],
MaxNumberOfMessages=10,
WaitTimeSeconds=10
)
for msg in messages:
logger.info("Received message: %s: %s", msg.message_id, msg.body)
这对您来说有些改变。第一件事是您愿意接收最多 10 条消息(这是一次呼叫中 SQS 的最大数量)。第二个是您将等待最多 10 秒钟才能收到消息。来自 SQS 文档:
The duration (in seconds) for which the call waits for a message to
arrive in the queue before returning. If a message is available, the
call returns sooner than WaitTimeSeconds. If no messages are available
and the wait time expires, the call returns successfully with an empty
list of messages.
因此您不需要自己的 sleep
呼叫 - 如果没有消息,呼叫将等到它过期。相反,如果您有大量消息,那么您将尽快获取所有消息,因为您不会在代码中调用自己的 sleep
。
Adding on @stdunbar Answer:
您会发现 MaxNumberOfMessages
如文档所述,消息数可能 return 少于提供的整数,我就是这种情况。
MaxNumberOfMessages (integer) -- The maximum number of messages to return. Amazon SQS never returns more messages than this value (however, fewer messages might be returned). Valid values: 1 to 10. Default: 1.
因此,我制定了从 SQS 读取的解决方案 Dead-Letter-Queue:
def read_dead_letter_queue():
""" This function is responsible for Reading Query Execution IDs related to the insertion that happens on Athena Query Engine
and we weren't able to deal with it in the Source Queue.
Args:
None
Returns:
Dictionary: That consists of execution_ids_list, mssg_receipt_handle_list and queue_url related to messages in a Dead-Letter-Queue that's related to the insertion operation into Athena Query Engine.
"""
try:
sqs_client = boto3.client('sqs')
queue_url = os.environ['DEAD_LETTER_QUEUE_URL']
execution_ids_list = list()
mssg_receipt_handle_list = list()
final_dict = {}
# You can change the range stop number to whatever number that suits your scenario, you just need to add a number that's more than the number of messages that maybe in the Queue as 1 thousand or 1 million, as the loop will break out when there aren't any messages left in the Queue before reaching the end of the range.
for mssg_counter in range(1, 20, 1):
sqs_response = sqs_client.receive_message(
QueueUrl = queue_url,
MaxNumberOfMessages = 10,
WaitTimeSeconds = 10
)
print(f"This is the dead-letter-queue response --> {sqs_response}")
try:
for mssg in sqs_response['Messages']:
print(f"This is the message body --> {mssg['Body']}")
print(f"This is the message ID --> {mssg['MessageId']}")
execution_ids_list.append(mssg['Body'])
mssg_receipt_handle_list.append(mssg['ReceiptHandle'])
except:
print(f"Breaking out of the loop, as there isn't any message left in the Queue.")
break
print(f"This is the execution_ids_list contents --> {execution_ids_list}")
print(f"This is the mssg_receipt_handle_list contents --> {mssg_receipt_handle_list}")
# We return the ReceiptHandle to be able to delete the message after we read it in another function that's responsible for deletion.
# We return a dictionary consists of --> {execution_ids_list: ['query_exec_id'], mssg_receipt_handle_list: ['ReceiptHandle']}
final_dict['execution_ids_list'] = execution_ids_list
final_dict['mssg_receipt_handle_list'] = mssg_receipt_handle_list
final_dict['queue_url'] = queue_url
return final_dict
#TODO: We need to delete the message after we finish reading in another function that will delete messages for both the DLQ and the Source Queue.
except Exception as ex:
print(f"read_dead_letter_queue Function Exception: {ex}")
我有一个 SQS 队列,我需要不断监视传入消息。一旦消息到达,我会做一些处理并继续等待下一条消息。我通过设置一个在循环结束时暂停 2 秒的无限循环来实现这一点。这行得通,但我不禁觉得这不是解决不断排队的需要的非常有效的方法。
代码示例:
while (1):
response = sqs.receive_message(
QueueUrl=queue_url,
AttributeNames=[
'SentTimestamp'
],
MaxNumberOfMessages=1,
MessageAttributeNames=[
'All'
],
VisibilityTimeout=1,
WaitTimeSeconds=1
)
try:
message = response['Messages'][0]
receipt_handle = message['ReceiptHandle']
# Delete received message from queue
sqs.delete_message(
QueueUrl=queue_url,
ReceiptHandle=receipt_handle
)
msg = message['Body']
msg_json = eval(msg)
value1 = msg_json['value1']
value2 = msg_json['value2']
process(value1, value2)
except:
pass
#print('Queue empty')
time.sleep(2)
为了干净地退出脚本(应该 运行 持续),我捕获了在 Ctrl+C 上触发的 KeyboardInterrupt 并执行一些清理例程以正常退出。
if __name__ == '__main__':
try:
main()
except KeyboardInterrupt:
logout()
有没有更好的办法实现SQS队列的不断轮询,2秒的延迟是否有必要?我尽量不去敲打 SQS 服务,但也许这无关紧要?
这就是 SQS 的最终工作方式 - 它需要一些东西来轮询它以获取消息。但一些建议:
不要每次只收到一条消息。做更多类似的事情:
messages = sqs.receive_messages(
MessageAttributeNames=['All'],
MaxNumberOfMessages=10,
WaitTimeSeconds=10
)
for msg in messages:
logger.info("Received message: %s: %s", msg.message_id, msg.body)
这对您来说有些改变。第一件事是您愿意接收最多 10 条消息(这是一次呼叫中 SQS 的最大数量)。第二个是您将等待最多 10 秒钟才能收到消息。来自 SQS 文档:
The duration (in seconds) for which the call waits for a message to arrive in the queue before returning. If a message is available, the call returns sooner than WaitTimeSeconds. If no messages are available and the wait time expires, the call returns successfully with an empty list of messages.
因此您不需要自己的 sleep
呼叫 - 如果没有消息,呼叫将等到它过期。相反,如果您有大量消息,那么您将尽快获取所有消息,因为您不会在代码中调用自己的 sleep
。
Adding on @stdunbar Answer:
您会发现 MaxNumberOfMessages
如文档所述,消息数可能 return 少于提供的整数,我就是这种情况。
MaxNumberOfMessages (integer) -- The maximum number of messages to return. Amazon SQS never returns more messages than this value (however, fewer messages might be returned). Valid values: 1 to 10. Default: 1.
因此,我制定了从 SQS 读取的解决方案 Dead-Letter-Queue:
def read_dead_letter_queue():
""" This function is responsible for Reading Query Execution IDs related to the insertion that happens on Athena Query Engine
and we weren't able to deal with it in the Source Queue.
Args:
None
Returns:
Dictionary: That consists of execution_ids_list, mssg_receipt_handle_list and queue_url related to messages in a Dead-Letter-Queue that's related to the insertion operation into Athena Query Engine.
"""
try:
sqs_client = boto3.client('sqs')
queue_url = os.environ['DEAD_LETTER_QUEUE_URL']
execution_ids_list = list()
mssg_receipt_handle_list = list()
final_dict = {}
# You can change the range stop number to whatever number that suits your scenario, you just need to add a number that's more than the number of messages that maybe in the Queue as 1 thousand or 1 million, as the loop will break out when there aren't any messages left in the Queue before reaching the end of the range.
for mssg_counter in range(1, 20, 1):
sqs_response = sqs_client.receive_message(
QueueUrl = queue_url,
MaxNumberOfMessages = 10,
WaitTimeSeconds = 10
)
print(f"This is the dead-letter-queue response --> {sqs_response}")
try:
for mssg in sqs_response['Messages']:
print(f"This is the message body --> {mssg['Body']}")
print(f"This is the message ID --> {mssg['MessageId']}")
execution_ids_list.append(mssg['Body'])
mssg_receipt_handle_list.append(mssg['ReceiptHandle'])
except:
print(f"Breaking out of the loop, as there isn't any message left in the Queue.")
break
print(f"This is the execution_ids_list contents --> {execution_ids_list}")
print(f"This is the mssg_receipt_handle_list contents --> {mssg_receipt_handle_list}")
# We return the ReceiptHandle to be able to delete the message after we read it in another function that's responsible for deletion.
# We return a dictionary consists of --> {execution_ids_list: ['query_exec_id'], mssg_receipt_handle_list: ['ReceiptHandle']}
final_dict['execution_ids_list'] = execution_ids_list
final_dict['mssg_receipt_handle_list'] = mssg_receipt_handle_list
final_dict['queue_url'] = queue_url
return final_dict
#TODO: We need to delete the message after we finish reading in another function that will delete messages for both the DLQ and the Source Queue.
except Exception as ex:
print(f"read_dead_letter_queue Function Exception: {ex}")