node.js - 控制 Promise 队列
node.js - Control a queue of Promises
我正在编写一个爬虫程序,它将使用 node.js 从电子商务网站获取数据。我要获取的每个输入都包含:
url
:URL 个 link
directory
: 稍后写入输出文件的目录名
page
: 查询参数
每个页面都会抓取一些项目,稍后会详细抓取每个项目
这是我的 fetchPage
承诺(agent
是 require('superagent')
),它将获取 HTML 文本:
function fetchPage(url,page){
return new Promise(
(resolve,reject)=>{
if (page>0){
agent
.get(url)
.send('page='+page)
.end(function(err,res){
if (err){
reject(err);
} else{
resolve(res.text);
}
});
} else{
agent
.get(url)
.end(function(err,res){
if (err){
reject(err);
} else{
resolve(res.text);
}
});
}
});
}
全球通话:
var data=[];
for (var i=1;i<=links[0].numOfPages;i++){
data.push({
url:links[0].url,
directory:links[0].directory,
page:i
});
}
const promises=data.reduce(
(promise,data)=>promise.then(()=>{
fetchPage(data.url,data.page).then(
(result)=>{
const urls=getUrls(result);
Promise.all(urls.map((url,i)=>fetchPage(url,0).then(
(result)=>{
var item=getItem(result);
item.url=url;
writeItem(item,data.directory,data.page,i+1);
},
(error)=>console.log(error)
)));
});
}),
Promise.resolve());
promises.then((values)=>console.log('All done'));
您将看到 3 个功能作为实用程序(它们都可以正常工作):
getUrls
: 处理页面的HTML文本,返回一个url数组
稍后要详细抓取的项目
getItem
: 处理 HTML 文本
item的详细页面,返回一个要写入的对象
文件
writeItem
: 将对象写入文件,提供目录
和页码以创建正确的目录并写入和存储
我一直遇到一个问题:
- 我如何使用承诺队列重建它,其中每个
promise会运行一个接一个有序的
同步并且只允许有限数量的承诺 运行ning concurrently?
如何正确有效地做到这一点?我应该如何更改这些当前代码?我也需要一些演示
我删除了fetchItem
函数因为它不需要(实际上,它用page = 0
调用fetchPage
),现在我只使用fetchPage
首先,如果你想真正控制你的执行,那么你不应该构造一个循环来调用一个promise。它将立即执行。相反,您应该构造一些数据以提供给承诺。抱歉,我不太了解您的程序流程。我可以看到您正在调用 fetchPage
并在它完成后调用 fetchItem
,它再次调用 fetchPage
。这可能就是您收到双倍回调的原因。
关于您的第二个问题,这里有一个示例,说明如何串行处理每个 link,并在最多 3 个并发作业的情况下并行处理 link 中的页面。
var Promise = require('bluebird');
var chance = new (require('chance'))();
var fetchPage = (url, page) => new Promise((resolve, reject) => {
// Simulate Network Operation
if (page === 0) {
console.log('Start Downloading: ' + url);
setTimeout(() => {
resolve({
url: url,
content: 'Content of ' + url
});
}, chance.integer({ min: 10, max: 250 }));
} else {
console.log('Start Downloading: ' + url + '?page=' + page);
setTimeout(() => {
resolve({
url: url + '?page=' + page,
content: 'Content of ' + url + '?page=' + page
});
}, chance.integer({ min: 10, max: 250 }));
}
});
var fetchItem = link => {
// Get the data to be supplied to fetchPage promise
var data = [];
for (var i = 0; i <= link.numOfPages; i++) {
data.push({
url: link.url,
page: i
});
}
return data;
};
var writeItem = (item, directory) => {
// Simulate Writing to Directory
console.log('Writing ' + item + ' to ' + directory + ' folder');
};
// Make some dummy links
var links = [];
for (var i = 0; i < 10; i++) {
var domain = chance.domain();
links.push({
url: chance.url({ domain: domain }),
directory: domain,
numOfPages: chance.integer({ min: 0, max: 5 })
});
}
// Process each URL serially
Promise.each(links, link => Promise.map(fetchItem(link), data => fetchPage(data.url, data.page).then(result => {
writeItem(result.content, link.directory);
console.log('Done Fetching: ' + result.url);
}), {
// Control the number of concurrent job
concurrency: 3
})).then(() => {
console.log('All Done!!');
});
更新: 一个更简单的例子来演示 Promise.each
和 Promise.map
var Promise = require('bluebird');
var chance = new (require('chance'))();
var tasks = [];
for (var i = 1; i <= chance.integer({ min: 10, max: 20 }); i++) {
var jobs = [];
for (var j = 1; j <= chance.integer({ min: 2, max: 10 }); j++) {
jobs.push({
job_name: 'Job ' + j
});
}
tasks.push({
task_name: 'Task ' + i,
jobs: jobs
});
}
Promise.each(tasks, task => Promise.map(task.jobs, job => new Promise((resolve, reject) => {
setTimeout(() => resolve(task.task_name + ' ' + job.job_name), chance.integer({ min: 20, max: 150 }));
}).then(log => console.log(log)), {
concurrency: 3
}).then(() => console.log())).then(() => {
console.log('All Done!!');
});
在此示例中,您可以清楚地看到每个任务都是 运行 顺序执行的,任务中的每个作业都是 运行 并行的,一次最多有 3 个并发作业。
对于您的情况,我建议您安装 Bluebird Promise 库,因为它提供了一些您可以使用的实用程序。
对于您的问题,通常情况下,您不会将 for 循环与 Promises 结合使用,而是构造一个数据数组和一个 returns Promise 的映射函数,然后 .map() + Promise.all()
或 .reduce()
数组到单个 Promise 中,当一切都完成时解析。
Bluebird 的 Promise.map()
还允许您指定并发选项,这将限制可以同时 运行 的操作数。
以下是一些可以帮助您入门的示例:
运行 并发异步操作
const Promise = require('bluebird');
const urls = ['https://url1.com', 'https://url2.com', ... ]; // lots of urls
// {concurrency: 4} means only 4 URLs are processed at any given time.
const allPromise = Promise.map(urls, fetchUrlAsync, {concurrency: 4});
allPromise.then(allValues => {
// Deal with all results in order of original array
});
运行 顺序异步操作:
const Promise = require('bluebird');
const urls = ['https://url1.com', 'https://url2.com', ... ]; // lots of urls
// {concurrency: 4} means only 4 URLs are processed at any given time.
const allPromise = urls.reduce((promise, url) =>
// Start with an empty promise, chain all calls on top of that
promise.then(() => fetchUrlAsync(url)), Promise.resolve());
allPromise.then(allValues => {
// Deal with all results in order of original array
});
尝试将事物视为值的集合,以及您对这些值执行的操作,将您的操作抽象为函数,并在适当的时候调用它们,不要在同一个地方混合获取和写入。
我正在编写一个爬虫程序,它将使用 node.js 从电子商务网站获取数据。我要获取的每个输入都包含:
url
:URL 个 linkdirectory
: 稍后写入输出文件的目录名page
: 查询参数
每个页面都会抓取一些项目,稍后会详细抓取每个项目
这是我的 fetchPage
承诺(agent
是 require('superagent')
),它将获取 HTML 文本:
function fetchPage(url,page){
return new Promise(
(resolve,reject)=>{
if (page>0){
agent
.get(url)
.send('page='+page)
.end(function(err,res){
if (err){
reject(err);
} else{
resolve(res.text);
}
});
} else{
agent
.get(url)
.end(function(err,res){
if (err){
reject(err);
} else{
resolve(res.text);
}
});
}
});
}
全球通话:
var data=[];
for (var i=1;i<=links[0].numOfPages;i++){
data.push({
url:links[0].url,
directory:links[0].directory,
page:i
});
}
const promises=data.reduce(
(promise,data)=>promise.then(()=>{
fetchPage(data.url,data.page).then(
(result)=>{
const urls=getUrls(result);
Promise.all(urls.map((url,i)=>fetchPage(url,0).then(
(result)=>{
var item=getItem(result);
item.url=url;
writeItem(item,data.directory,data.page,i+1);
},
(error)=>console.log(error)
)));
});
}),
Promise.resolve());
promises.then((values)=>console.log('All done'));
您将看到 3 个功能作为实用程序(它们都可以正常工作):
getUrls
: 处理页面的HTML文本,返回一个url数组 稍后要详细抓取的项目getItem
: 处理 HTML 文本 item的详细页面,返回一个要写入的对象 文件writeItem
: 将对象写入文件,提供目录 和页码以创建正确的目录并写入和存储
我一直遇到一个问题:
- 我如何使用承诺队列重建它,其中每个 promise会运行一个接一个有序的 同步并且只允许有限数量的承诺 运行ning concurrently?
如何正确有效地做到这一点?我应该如何更改这些当前代码?我也需要一些演示
我删除了fetchItem
函数因为它不需要(实际上,它用page = 0
调用fetchPage
),现在我只使用fetchPage
首先,如果你想真正控制你的执行,那么你不应该构造一个循环来调用一个promise。它将立即执行。相反,您应该构造一些数据以提供给承诺。抱歉,我不太了解您的程序流程。我可以看到您正在调用 fetchPage
并在它完成后调用 fetchItem
,它再次调用 fetchPage
。这可能就是您收到双倍回调的原因。
关于您的第二个问题,这里有一个示例,说明如何串行处理每个 link,并在最多 3 个并发作业的情况下并行处理 link 中的页面。
var Promise = require('bluebird');
var chance = new (require('chance'))();
var fetchPage = (url, page) => new Promise((resolve, reject) => {
// Simulate Network Operation
if (page === 0) {
console.log('Start Downloading: ' + url);
setTimeout(() => {
resolve({
url: url,
content: 'Content of ' + url
});
}, chance.integer({ min: 10, max: 250 }));
} else {
console.log('Start Downloading: ' + url + '?page=' + page);
setTimeout(() => {
resolve({
url: url + '?page=' + page,
content: 'Content of ' + url + '?page=' + page
});
}, chance.integer({ min: 10, max: 250 }));
}
});
var fetchItem = link => {
// Get the data to be supplied to fetchPage promise
var data = [];
for (var i = 0; i <= link.numOfPages; i++) {
data.push({
url: link.url,
page: i
});
}
return data;
};
var writeItem = (item, directory) => {
// Simulate Writing to Directory
console.log('Writing ' + item + ' to ' + directory + ' folder');
};
// Make some dummy links
var links = [];
for (var i = 0; i < 10; i++) {
var domain = chance.domain();
links.push({
url: chance.url({ domain: domain }),
directory: domain,
numOfPages: chance.integer({ min: 0, max: 5 })
});
}
// Process each URL serially
Promise.each(links, link => Promise.map(fetchItem(link), data => fetchPage(data.url, data.page).then(result => {
writeItem(result.content, link.directory);
console.log('Done Fetching: ' + result.url);
}), {
// Control the number of concurrent job
concurrency: 3
})).then(() => {
console.log('All Done!!');
});
更新: 一个更简单的例子来演示 Promise.each
和 Promise.map
var Promise = require('bluebird');
var chance = new (require('chance'))();
var tasks = [];
for (var i = 1; i <= chance.integer({ min: 10, max: 20 }); i++) {
var jobs = [];
for (var j = 1; j <= chance.integer({ min: 2, max: 10 }); j++) {
jobs.push({
job_name: 'Job ' + j
});
}
tasks.push({
task_name: 'Task ' + i,
jobs: jobs
});
}
Promise.each(tasks, task => Promise.map(task.jobs, job => new Promise((resolve, reject) => {
setTimeout(() => resolve(task.task_name + ' ' + job.job_name), chance.integer({ min: 20, max: 150 }));
}).then(log => console.log(log)), {
concurrency: 3
}).then(() => console.log())).then(() => {
console.log('All Done!!');
});
在此示例中,您可以清楚地看到每个任务都是 运行 顺序执行的,任务中的每个作业都是 运行 并行的,一次最多有 3 个并发作业。
对于您的情况,我建议您安装 Bluebird Promise 库,因为它提供了一些您可以使用的实用程序。
对于您的问题,通常情况下,您不会将 for 循环与 Promises 结合使用,而是构造一个数据数组和一个 returns Promise 的映射函数,然后 .map() + Promise.all()
或 .reduce()
数组到单个 Promise 中,当一切都完成时解析。
Bluebird 的 Promise.map()
还允许您指定并发选项,这将限制可以同时 运行 的操作数。
以下是一些可以帮助您入门的示例:
运行 并发异步操作
const Promise = require('bluebird');
const urls = ['https://url1.com', 'https://url2.com', ... ]; // lots of urls
// {concurrency: 4} means only 4 URLs are processed at any given time.
const allPromise = Promise.map(urls, fetchUrlAsync, {concurrency: 4});
allPromise.then(allValues => {
// Deal with all results in order of original array
});
运行 顺序异步操作:
const Promise = require('bluebird');
const urls = ['https://url1.com', 'https://url2.com', ... ]; // lots of urls
// {concurrency: 4} means only 4 URLs are processed at any given time.
const allPromise = urls.reduce((promise, url) =>
// Start with an empty promise, chain all calls on top of that
promise.then(() => fetchUrlAsync(url)), Promise.resolve());
allPromise.then(allValues => {
// Deal with all results in order of original array
});
尝试将事物视为值的集合,以及您对这些值执行的操作,将您的操作抽象为函数,并在适当的时候调用它们,不要在同一个地方混合获取和写入。