我如何 "create"/"assign" Google Cloud Pubsub 的日志处理程序?
How do I "create"/"assign" a logging handler for Google Cloud Pubsub?
previous thread 的开发发现问问题时的假设是题外话(子流程实际上不是导致问题的原因),所以我正在做一个更有针对性的 post。
我的错误信息:
No handlers could be found for logger
"google.cloud.pubsub_v1.subscriber._protocol.streaming_pull_manager"
我的意图:
将 Google PubSub 消息属性作为 Python 变量传递,以便在以后的代码中重用。
我的代码:
import time
import logging
from google.cloud import pubsub_v1
project_id = "redacted"
subscription_name = "redacted"
def receive_messages_with_custom_attributes(project_id, subscription_name):
"""Receives messages from a pull subscription."""
# [START pubsub_subscriber_sync_pull_custom_attributes]
subscriber = pubsub_v1.SubscriberClient()
subscription_path = subscriber.subscription_path(
project_id, subscription_name)
def callback(message):
print('Received message: {}'.format(message.data))
if message.attributes:
#print('Attributes:')
for key in message.attributes:
value = message.attributes.get(key);
#commented out to not print to terminal
#which should not be necessary
#print('{}: {}'.format(key, value))
message.ack()
print("this is before variables")
dirpath = "~/subfolder1/"
print(dirpath)
namepath = message.data["name"]
print(namepath)
fullpath = dirpath + namepath
print(fullpath)
print("this is after variables")
subscriber.subscribe(subscription_path, callback=callback)
# The subscriber is non-blocking, so we must keep the main thread from
# exiting to allow it to process messages in the background.
print('Listening for messages on {}'.format(subscription_path))
while True:
time.sleep(60)
# [END pubsub_subscriber_sync_pull_custom_attributes]
receive_messages_with_custom_attributes(project_id, subscription_name)
我的 运行 上面代码的完整控制台输出:
Listening for messages on projects/[redacted]
Received message: {
"kind": "storage#object",
"id": "[redacted]/0.testing/1548033442364022",
"selfLink": "https://www.googleapis.com/storage/v1/b/[redacted]/o/BSD%2F0.testing",
"name": "BSD/0.testing",
"bucket": "[redacted]",
"generation": "1548033442364022",
"metageneration": "1",
"contentType": "application/octet-stream",
"timeCreated": "2019-01-21T01:17:22.363Z",
"updated": "2019-01-21T01:17:22.363Z",
"storageClass": "MULTI_REGIONAL",
"timeStorageClassUpdated": "2019-01-21T01:17:22.363Z",
"size": "0",
"md5Hash": "1B2M2Y8AsgTpgAmY7PhCfg==",
"mediaLink": "https://www.googleapis.com/download/storage/v1/b/[redacted]/o/BSD%2F0.testing?generation=1548033442364022&alt=media",
"crc32c": "AAAAAA==",
"etag": "CPb0uvvZ/d8CEAE="
}
this is before variables
/home/[redacted]
No handlers could be found for logger "google.cloud.pubsub_v1.subscriber._protocol.streaming_pull_manager"
如您所见,打印了第一个字符串和字符串定义为变量,但代码在尝试从刚刚生成的字典中定义变量时中断,并且没有进一步的 print()
s已执行。
,该用户正在发布 cron 作业,并从 crontab envpaths 中找到了修复程序,但我的情况是接收但未使用任何 cron 作业,但可能暗示另一层 behind/within python?
任何人都可以帮我添加一个处理程序以使此代码 运行 符合预期吗?
首先,如果我对您在输出中显示的内容的理解正确,那么只要您对云存储对象进行更改,您就会使用 Pub/Sub 通知发送消息。此信息可能会有所帮助。
现在,message.data["name"]
将不起作用,因为 message.data 是 BYTES object。因此,不能作为字典索引。
要将其视为字典,您首先必须将其解码为 base64 (import base64
)。之后,你剩下的是一个看起来像 JSON 格式的字符串。然后使用 json.load()
(不要忘记 import json
) 将此字符串转换为字典。现在您可以索引邮件了。
此代码为:
print("This is before variables")
dirpath = "/subfolder1/"
print(dirpath)
#Transform the bytes object into a string by decoding it
namepath = base64.b64decode(message.data).decode('utf-8')
#Transform the json formated string into a dict
namepath = json.loads(namepath)
print(namepath["name"])
fullpath = dirpath + namepath["name"]
print(fullpath)
print("this is after variables")
现在,如果您只想读取属性,它们在顶部的正确定义如下:
if message.attributes:
print('Attributes:')
for key in message.attributes:
value = message.attributes.get(key)
print('{}: {}'.format(key, value))
因此,您可以使用:
print("this is before variables")
dirpath = "~/subfolder1/"
print(dirpath)
namepath = message.attributes["objectId"]
print(namepath)
fullpath = dirpath + namepath
print(fullpath)
print("this is after variables")
请记住,对于这种特殊情况,"objectId"
是文件的名称,因为它是来自 Pub/Sub 的云存储通知使用的属性。如果您假装发送自定义消息,请将 "objectId"
更改为您想要的属性名称。
正如 Nahuel 和 tripleee 所解释的那样,问题在于消息是 BYTES 而不是字符串。然而,他们的代码并没有完全正常工作,仍然抛出错误,我不知道为什么。通过交叉引用 Google 的 pubsub appengine 网站示例代码,以及几个小时的反复试验,我发现以下代码可以正常工作。 可能不优雅and/or有不好的做法,在这种情况下请编辑它并使其更健壮。
#Continues from after message.ack(), above code remains unchanged
#except needing to <import json>
#this makes a message.data a true python dict with strings.
payload = json.loads(message.data.decode('utf-8'))
#this finds the value of the dict with key "name"
namepath = payload["name"]
#this is just a static string to pre-pend to the file path
dirpath = "/home/[redacted]/"
#combine them into a single functioning path
fullpath = dirpath + namepath
#currently type 'unicode', so convert them to type 'str'
fullpath = fullpath.encode("utf-8")
最后我们将有一个纯类型 'str' 的完整路径供以后 functions/commands 使用。
previous thread 的开发发现问问题时的假设是题外话(子流程实际上不是导致问题的原因),所以我正在做一个更有针对性的 post。
我的错误信息:
No handlers could be found for logger "google.cloud.pubsub_v1.subscriber._protocol.streaming_pull_manager"
我的意图:
将 Google PubSub 消息属性作为 Python 变量传递,以便在以后的代码中重用。
我的代码:
import time
import logging
from google.cloud import pubsub_v1
project_id = "redacted"
subscription_name = "redacted"
def receive_messages_with_custom_attributes(project_id, subscription_name):
"""Receives messages from a pull subscription."""
# [START pubsub_subscriber_sync_pull_custom_attributes]
subscriber = pubsub_v1.SubscriberClient()
subscription_path = subscriber.subscription_path(
project_id, subscription_name)
def callback(message):
print('Received message: {}'.format(message.data))
if message.attributes:
#print('Attributes:')
for key in message.attributes:
value = message.attributes.get(key);
#commented out to not print to terminal
#which should not be necessary
#print('{}: {}'.format(key, value))
message.ack()
print("this is before variables")
dirpath = "~/subfolder1/"
print(dirpath)
namepath = message.data["name"]
print(namepath)
fullpath = dirpath + namepath
print(fullpath)
print("this is after variables")
subscriber.subscribe(subscription_path, callback=callback)
# The subscriber is non-blocking, so we must keep the main thread from
# exiting to allow it to process messages in the background.
print('Listening for messages on {}'.format(subscription_path))
while True:
time.sleep(60)
# [END pubsub_subscriber_sync_pull_custom_attributes]
receive_messages_with_custom_attributes(project_id, subscription_name)
我的 运行 上面代码的完整控制台输出:
Listening for messages on projects/[redacted]
Received message: {
"kind": "storage#object",
"id": "[redacted]/0.testing/1548033442364022",
"selfLink": "https://www.googleapis.com/storage/v1/b/[redacted]/o/BSD%2F0.testing",
"name": "BSD/0.testing",
"bucket": "[redacted]",
"generation": "1548033442364022",
"metageneration": "1",
"contentType": "application/octet-stream",
"timeCreated": "2019-01-21T01:17:22.363Z",
"updated": "2019-01-21T01:17:22.363Z",
"storageClass": "MULTI_REGIONAL",
"timeStorageClassUpdated": "2019-01-21T01:17:22.363Z",
"size": "0",
"md5Hash": "1B2M2Y8AsgTpgAmY7PhCfg==",
"mediaLink": "https://www.googleapis.com/download/storage/v1/b/[redacted]/o/BSD%2F0.testing?generation=1548033442364022&alt=media",
"crc32c": "AAAAAA==",
"etag": "CPb0uvvZ/d8CEAE="
}
this is before variables
/home/[redacted]
No handlers could be found for logger "google.cloud.pubsub_v1.subscriber._protocol.streaming_pull_manager"
如您所见,打印了第一个字符串和字符串定义为变量,但代码在尝试从刚刚生成的字典中定义变量时中断,并且没有进一步的 print()
s已执行。
任何人都可以帮我添加一个处理程序以使此代码 运行 符合预期吗?
首先,如果我对您在输出中显示的内容的理解正确,那么只要您对云存储对象进行更改,您就会使用 Pub/Sub 通知发送消息。此信息可能会有所帮助。
现在,message.data["name"]
将不起作用,因为 message.data 是 BYTES object。因此,不能作为字典索引。
要将其视为字典,您首先必须将其解码为 base64 (import base64
)。之后,你剩下的是一个看起来像 JSON 格式的字符串。然后使用 json.load()
(不要忘记 import json
) 将此字符串转换为字典。现在您可以索引邮件了。
此代码为:
print("This is before variables")
dirpath = "/subfolder1/"
print(dirpath)
#Transform the bytes object into a string by decoding it
namepath = base64.b64decode(message.data).decode('utf-8')
#Transform the json formated string into a dict
namepath = json.loads(namepath)
print(namepath["name"])
fullpath = dirpath + namepath["name"]
print(fullpath)
print("this is after variables")
现在,如果您只想读取属性,它们在顶部的正确定义如下:
if message.attributes:
print('Attributes:')
for key in message.attributes:
value = message.attributes.get(key)
print('{}: {}'.format(key, value))
因此,您可以使用:
print("this is before variables")
dirpath = "~/subfolder1/"
print(dirpath)
namepath = message.attributes["objectId"]
print(namepath)
fullpath = dirpath + namepath
print(fullpath)
print("this is after variables")
请记住,对于这种特殊情况,"objectId"
是文件的名称,因为它是来自 Pub/Sub 的云存储通知使用的属性。如果您假装发送自定义消息,请将 "objectId"
更改为您想要的属性名称。
正如 Nahuel 和 tripleee 所解释的那样,问题在于消息是 BYTES 而不是字符串。然而,他们的代码并没有完全正常工作,仍然抛出错误,我不知道为什么。通过交叉引用 Google 的 pubsub appengine 网站示例代码,以及几个小时的反复试验,我发现以下代码可以正常工作。 可能不优雅and/or有不好的做法,在这种情况下请编辑它并使其更健壮。
#Continues from after message.ack(), above code remains unchanged
#except needing to <import json>
#this makes a message.data a true python dict with strings.
payload = json.loads(message.data.decode('utf-8'))
#this finds the value of the dict with key "name"
namepath = payload["name"]
#this is just a static string to pre-pend to the file path
dirpath = "/home/[redacted]/"
#combine them into a single functioning path
fullpath = dirpath + namepath
#currently type 'unicode', so convert them to type 'str'
fullpath = fullpath.encode("utf-8")
最后我们将有一个纯类型 'str' 的完整路径供以后 functions/commands 使用。