GCP Nodejs8 Cloud Function - 同步发布订阅
GCP Nodejs8 Cloud Function - Synchronous PubSub publish
我正在努力使用 javascript/Nodejs8 Google 云函数将有效负载发布到 Google PubSub。
所以我有一个由 HTTP 请求触发的 Cloud Function,然后将请求正文发布到 pubsub 主题(配置为拉模式)。
这是我的代码:
const {PubSub} = require('@google-cloud/pubsub');
const pubsub = new PubSub();
const topic = pubsub.topic('my-fancy-topic');
function formatPubSubMessage(reqObj){
// the body is pure text
return Buffer.from(reqObj.body);
};
exports.entryPoint = function validate(req, res) {
topic.publish(formatPubSubMessage(req)).then((messageId) => {
console.log("sent pubsub message with id :: " + messageId)
});
res.status(200).json({"res":"OK"});
};
我的问题是云函数在发布 pubsub 消息之前完成执行(在日志中,日志 "Function execution took X ms, finished with status code: 200" 出现在我的 pubsub 日志之前大约 30 或 40 秒。我也有几次使用 "Ignoring exception from a finished function" 登录,但我没有收到我的 pubsub 日志)
我不是 javascript 或 nodejs 专家,我也不精通 javascript 承诺,但我想知道是否可以同步发布。我也在想我可能在这里做错了什么!
预先感谢您的帮助。
现在,此代码正在发送响应,然后 发布完成。发送响应后,函数终止,正在进行的异步工作可能无法完成。
您应该做的是仅在发布完成后才发送响应,这意味着将该行代码放在 then
回调中。
exports.entryPoint = function validate(req, res) {
topic.publish(formatPubSubMessage(req)).then((messageId) => {
console.log("sent pubsub message with id :: " + messageId)
res.status(200).json({"res":"OK"});
});
};
我建议花一些时间了解 promises 的工作原理,因为这对于构建正确工作的函数至关重要。
在您的逻辑中,您的回调/事件处理函数在 HTTP 消息到达时被调用。然后执行 publish() 函数。执行发布是异步的 activity。这意味着发布需要一些时间才能完成,并且由于 JavaScript(本质上)不想阻止,它会立即 return 并承诺您可以在以下时间收到通知异步工作已经完成。在执行 publish() 之后,您的逻辑会立即执行 res.status(....) ,它会向 HTTP 请求发送响应,这确实是来自 HTTP 客户端的流请求的结尾。异步发布仍在进行中,当它本身完成时,发布的回调就会发生,您会记录一个响应。
不幸的是,这不是 Google 此处记录的好做法 ...
https://cloud.google.com/functions/docs/bestpractices/tips#do_not_start_background_activities
在最后一个故事中,您调用 validate
的函数仍将在发布完成之前结束。如果你想在 publish() 执行时阻塞(有效地使其同步),你可以使用 JavaScript await
关键字。松散地,类似于:
try {
let messageId = await topic.publish(....);
console.log(...);
catch(e) {
...
}
您还需要将函数标记为 async
。例如:
exports.entryPoint = async function validate(req, res) {
...
参见:https://developer.mozilla.org/en-US/docs/Web/JavaScript/Reference/Statements/async_function
您也可以简单地 return 函数中的 Promise,回调函数将不会被视为已解决,直到整个 Promise 被解决。
底线是深入研究Promises。
我正在努力使用 javascript/Nodejs8 Google 云函数将有效负载发布到 Google PubSub。
所以我有一个由 HTTP 请求触发的 Cloud Function,然后将请求正文发布到 pubsub 主题(配置为拉模式)。
这是我的代码:
const {PubSub} = require('@google-cloud/pubsub');
const pubsub = new PubSub();
const topic = pubsub.topic('my-fancy-topic');
function formatPubSubMessage(reqObj){
// the body is pure text
return Buffer.from(reqObj.body);
};
exports.entryPoint = function validate(req, res) {
topic.publish(formatPubSubMessage(req)).then((messageId) => {
console.log("sent pubsub message with id :: " + messageId)
});
res.status(200).json({"res":"OK"});
};
我的问题是云函数在发布 pubsub 消息之前完成执行(在日志中,日志 "Function execution took X ms, finished with status code: 200" 出现在我的 pubsub 日志之前大约 30 或 40 秒。我也有几次使用 "Ignoring exception from a finished function" 登录,但我没有收到我的 pubsub 日志)
我不是 javascript 或 nodejs 专家,我也不精通 javascript 承诺,但我想知道是否可以同步发布。我也在想我可能在这里做错了什么!
预先感谢您的帮助。
现在,此代码正在发送响应,然后 发布完成。发送响应后,函数终止,正在进行的异步工作可能无法完成。
您应该做的是仅在发布完成后才发送响应,这意味着将该行代码放在 then
回调中。
exports.entryPoint = function validate(req, res) {
topic.publish(formatPubSubMessage(req)).then((messageId) => {
console.log("sent pubsub message with id :: " + messageId)
res.status(200).json({"res":"OK"});
});
};
我建议花一些时间了解 promises 的工作原理,因为这对于构建正确工作的函数至关重要。
在您的逻辑中,您的回调/事件处理函数在 HTTP 消息到达时被调用。然后执行 publish() 函数。执行发布是异步的 activity。这意味着发布需要一些时间才能完成,并且由于 JavaScript(本质上)不想阻止,它会立即 return 并承诺您可以在以下时间收到通知异步工作已经完成。在执行 publish() 之后,您的逻辑会立即执行 res.status(....) ,它会向 HTTP 请求发送响应,这确实是来自 HTTP 客户端的流请求的结尾。异步发布仍在进行中,当它本身完成时,发布的回调就会发生,您会记录一个响应。
不幸的是,这不是 Google 此处记录的好做法 ...
https://cloud.google.com/functions/docs/bestpractices/tips#do_not_start_background_activities
在最后一个故事中,您调用 validate
的函数仍将在发布完成之前结束。如果你想在 publish() 执行时阻塞(有效地使其同步),你可以使用 JavaScript await
关键字。松散地,类似于:
try {
let messageId = await topic.publish(....);
console.log(...);
catch(e) {
...
}
您还需要将函数标记为 async
。例如:
exports.entryPoint = async function validate(req, res) {
...
参见:https://developer.mozilla.org/en-US/docs/Web/JavaScript/Reference/Statements/async_function
您也可以简单地 return 函数中的 Promise,回调函数将不会被视为已解决,直到整个 Promise 被解决。
底线是深入研究Promises。