排队异步任务

Queueing Asynchronous tasks

我正在尝试创建一个脚本,该脚本获取 URL 的列表,转到该站点并截取屏幕截图。

我已经设法让它与木偶操纵者一起工作。然而,我遇到的问题是当我在列表中说 50 URLs 时,它会尝试同时为所有这些启动 puppet 会话,这意味着大多数时间在网站加载之前超时并且它可以截图。

我发现我可以一次成功 运行 10 个,所以我想设置一个排队系统来执行此操作。

parser.on('readable', function(){
  while(record = parser.read()){
      counter +=1;
      console.log(record.URL);


      (async (url = record.URL, name = record.shortURL, counter1 = counter) => {
      const browser = await puppeteer.launch( {defaultViewport: {width: 1024, height:768} } );
      const page = await browser.newPage();
      await page.goto(url);
      title = await page.title();
      domainRegex = /^(?:https?:\/\/)?(?:[^@\n]+@)?(?:www\.)?([^:\/\n?]+)/img;
      match = domainRegex.exec(url);

      width = 1024;//await page.viewport().width;
      height = 1000;//await page.viewport.height();
      await page.screenshot({path: "Screenshots/"+counter1+". "+match[1] + "- " +title.replace(/[\W_]+/g,"")+".jpg", clip : {x:0, y:0, width: width, height: height}});

      await browser.close();    
      })();

  }
});

如果你想运行所有这些都是串行的,你可以把它变成一个异步函数并等待它。这样,就会运行一个接一个

// let's separate it for readability
async function getRecord(record, counter) {
    const url = record.URL,
        name = record.shortURL,
        counter1 = counter;
    const browser = await puppeteer.launch({
        defaultViewport: {
            width: 1024,
            height: 768
        }
    });
    const page = await browser.newPage();
    await page.goto(url);
    title = await page.title();
    domainRegex = /^(?:https?:\/\/)?(?:[^@\n]+@)?(?:www\.)?([^:\/\n?]+)/img;
    match = domainRegex.exec(url);

    width = 1024; //await page.viewport().width;
    height = 1000; //await page.viewport.height();
    await page.screenshot({
        path: "Screenshots/" + counter1 + ". " + match[1] + "- " + title.replace(/[\W_]+/g, "") + ".jpg",
        clip: {
            x: 0,
            y: 0,
            width: width,
            height: height
        }
    });

    await browser.close();
}

parser.on('readable', async function() { // <-- here we make it async
    while (record = parser.read()) {
        counter += 1;
        console.log(record.URL);
        await getRecord(record, counter) // <-- and we await each call
    }
});

还有其他方法,例如 Promise.mapfor..of,但我们暂时保持简单。

如果您想 运行 一组顺序的承诺,您可以使用 Bluebird 包中的 Promise.mapSeries。我知道这意味着要添加一个额外的包,但它很简单,不需要你构建一个排队系统。

http://bluebirdjs.com/docs/api/promise.mapseries.html

下面的代码最初将启动 10 个会话。每个会话完成后,它将使下一条记录出队并启动另一条记录,直到没有更多记录为止。这将确保最多 10 个同时 运行。

parser.on('readable', async () => {
    const maxNumberOfSessions = 10;
    let counter = 0;

    await Promise.all(Array.from({length: maxNumberOfSessions}, dequeueRecord));
    console.log("All records have been processed.");

    function dequeueRecord() {
        const nextRecord = parser.read();
        if(nextRecord) return processRecord(nextRecord).then(dequeueRecord);
    }

    async function processRecord(record) {
        const number = ++counter;
        console.log("Processing record #" + number + ": " + record.URL);

        const browser = await puppeteer.launch({defaultViewport: {width: 1024, height: 768}});
        const page = await browser.newPage();
        await page.goto(record.URL);
        const title = await page.title();
        const domainRegex = /^(?:https?:\/\/)?(?:[^@\n]+@)?(?:www\.)?([^:\/\n?]+)/img;
        const match = domainRegex.exec(record.URL);

        const width = 1024; // await page.viewport().width;
        const height = 1000; // await page.viewport().height;
        await page.screenshot({path: "Screenshots/" + number + ". " + match[1] + "- " + title.replace(/[\W_]+/g, "") + ".jpg", clip: {x: 0, y: 0, width, height}});

        await browser.close();    
    }
});

你可能想看看 puppeteer-cluster(免责声明:我是作者)。

你可以这样做:

(async () => {
    // create a cluster that handles 10 parallel browsers
    const cluster = await Cluster.launch({
        concurrency: Cluster.CONCURRENCY_BROWSER,
        maxConcurrency: 10,
    });

    // define the task
    await cluster.task(async ({ page, data: { counter, record} }) => {
        const url = record.URL;

        await page.goto(url);
        title = await page.title();
        domainRegex = /^(?:https?:\/\/)?(?:[^@\n]+@)?(?:www\.)?([^:\/\n?]+)/img;
        match = domainRegex.exec(url);

        width = 1024;//await page.viewport().width;
        height = 1000;//await page.viewport.height();
        await page.screenshot({path: "Screenshots/"+counter+". "+match[1] + "- " +title.replace(/[\W_]+/g,"")+".jpg", clip : {x:0, y:0, width: width, height: height}});
    });

    // queue your jobs
    parser.on('readable', function () {
        while (record = parser.read()) {
            counter += 1;
            cluster.queue({ counter, record });
        }
    });
})();

这将处理 10 个并行浏览器实例,还将处理浏览器崩溃和错误处理。