使用 Promise.all 的异步等待和瓶颈速率限制
Async-Await & Bottleneck Rate Limiting using Promise.all
我使用的是 API,它的速率限制为 500 个请求/分钟。
因此我决定使用 bottleneck。但是我需要执行异步函数数组,这些函数生成一个 Promise 来进行 API 调用。我不确定我是否走对了路。因为 API 用 "Exceeded rate limit of 83 in 10_seconds" 回复我,我在 10 秒内只发送了 70 个请求。
下面是我调用主函数的方式:
const result = await Helper.updateUsers(request.query.where);
..
..
这是helper.js
const Boom = require("boom");
const mongoose = require("mongoose");
const Bottleneck = require("bottleneck");
const Intercom = require("intercom-client");
const config = require("../../config/config");
const client = new Intercom.Client({
token: config.intercom.access_token
});
const User = mongoose.model("User");
const Shop = mongoose.model("Shop");
// create a rate limiter that allows up to 70 API calls per 10 seconds,
// with max concurrency of 70
const limiter = new Bottleneck({
maxConcurrent: 70,
minTime: 10000
});
// Helpers
// This function prepares a valid Intercom User Object.
// user -> User Object
// returns <Promise>
const prepareAndUpdateUser = async user => {
try {
let userData = {
email: user.email,
user_id: user._id,
companies: []
};
Shop.find({ _id: { $in: user.account.shops } })
.exec((err, shops) => {
if (err) console.log("INTERCOM UPDATE USER", err);
shops.forEach(shop => {
let shopData = {
company_id: shop._id,
name: shop.name[shop.defaultLanguage.code]
};
userData.companies.push(shopData);
});
// Update Intercom Promise
return client.users.create(userData);
});
} catch (e) {
return Boom.boomify(err);
}
};
module.exports.updateUsers = async query => {
try {
const users = await User.find(query)
.populate("account")
.limit(700);
if (users && users.length > 0) {
limiter.schedule(() => {
const allTasks = users.map(
async user => await prepareAndUpdateUser(user)
);
return Promise.all(allTasks);
});
return users.length;
} else {
return 0;
}
} catch (err) {
return Boom.boomify(err);
}
};
我使用 Bottleneck 和 Async-Await 是否正确?
首先要指出的是您在 async
方法中使用回调而不是 await
承诺。您应该使用 Shops.find()
的承诺返回版本和 await
结果。
async function prepareAndUpdateUser(user) {
try {
const shops = await Shop.find({ _id: { $in: user.account.shops } }).exec();
return client.users.create({
email: user.email,
user_id: user._id,
companies: shops.map(shop => {
return {
company_id: shop._id,
name: shop.name[shop.defaultLanguage.code]
};
})
});
} catch (e) {
return Boom.boomify(err);
}
}
在您的 updateUsers
方法中,您反向使用了速率限制器。您希望将用户映射到速率限制器中,以便它可以控制何时调用 prepareAndUpdateUser
,目前您将并行请求所有内容。您还希望等待速率限制器返回的承诺得到解决。本质上,您需要将 limiter.scehdule(...)
移动到 user.map(...)
.
async function updateUsers(query) {
try {
const users = await User.find(query)
.populate("account")
.limit(700);
if (users && users.length > 0) {
// Schedule an update for each user
const allTasks = users.map(user => {
// Schedule returns a promise that resolves when the operation is complete
return limiter.schedule(() => {
// This method is called when the scheduler is ready for it
return prepareAndUpdateUser(user)
})
});
// Wait for all the scheduled tasks to complete
await Promise.all(allTasks);
return users.length;
} else {
return 0;
}
} catch (err) {
return Boom.boomify(err);
}
}
我使用的是 API,它的速率限制为 500 个请求/分钟。 因此我决定使用 bottleneck。但是我需要执行异步函数数组,这些函数生成一个 Promise 来进行 API 调用。我不确定我是否走对了路。因为 API 用 "Exceeded rate limit of 83 in 10_seconds" 回复我,我在 10 秒内只发送了 70 个请求。
下面是我调用主函数的方式:
const result = await Helper.updateUsers(request.query.where);
..
..
这是helper.js
const Boom = require("boom");
const mongoose = require("mongoose");
const Bottleneck = require("bottleneck");
const Intercom = require("intercom-client");
const config = require("../../config/config");
const client = new Intercom.Client({
token: config.intercom.access_token
});
const User = mongoose.model("User");
const Shop = mongoose.model("Shop");
// create a rate limiter that allows up to 70 API calls per 10 seconds,
// with max concurrency of 70
const limiter = new Bottleneck({
maxConcurrent: 70,
minTime: 10000
});
// Helpers
// This function prepares a valid Intercom User Object.
// user -> User Object
// returns <Promise>
const prepareAndUpdateUser = async user => {
try {
let userData = {
email: user.email,
user_id: user._id,
companies: []
};
Shop.find({ _id: { $in: user.account.shops } })
.exec((err, shops) => {
if (err) console.log("INTERCOM UPDATE USER", err);
shops.forEach(shop => {
let shopData = {
company_id: shop._id,
name: shop.name[shop.defaultLanguage.code]
};
userData.companies.push(shopData);
});
// Update Intercom Promise
return client.users.create(userData);
});
} catch (e) {
return Boom.boomify(err);
}
};
module.exports.updateUsers = async query => {
try {
const users = await User.find(query)
.populate("account")
.limit(700);
if (users && users.length > 0) {
limiter.schedule(() => {
const allTasks = users.map(
async user => await prepareAndUpdateUser(user)
);
return Promise.all(allTasks);
});
return users.length;
} else {
return 0;
}
} catch (err) {
return Boom.boomify(err);
}
};
我使用 Bottleneck 和 Async-Await 是否正确?
首先要指出的是您在 async
方法中使用回调而不是 await
承诺。您应该使用 Shops.find()
的承诺返回版本和 await
结果。
async function prepareAndUpdateUser(user) {
try {
const shops = await Shop.find({ _id: { $in: user.account.shops } }).exec();
return client.users.create({
email: user.email,
user_id: user._id,
companies: shops.map(shop => {
return {
company_id: shop._id,
name: shop.name[shop.defaultLanguage.code]
};
})
});
} catch (e) {
return Boom.boomify(err);
}
}
在您的 updateUsers
方法中,您反向使用了速率限制器。您希望将用户映射到速率限制器中,以便它可以控制何时调用 prepareAndUpdateUser
,目前您将并行请求所有内容。您还希望等待速率限制器返回的承诺得到解决。本质上,您需要将 limiter.scehdule(...)
移动到 user.map(...)
.
async function updateUsers(query) {
try {
const users = await User.find(query)
.populate("account")
.limit(700);
if (users && users.length > 0) {
// Schedule an update for each user
const allTasks = users.map(user => {
// Schedule returns a promise that resolves when the operation is complete
return limiter.schedule(() => {
// This method is called when the scheduler is ready for it
return prepareAndUpdateUser(user)
})
});
// Wait for all the scheduled tasks to complete
await Promise.all(allTasks);
return users.length;
} else {
return 0;
}
} catch (err) {
return Boom.boomify(err);
}
}