控制 javascript 异步流的速率(在循环中)

Control the rate of a javascript asynchronous flow (in a loop)

假设您想用一个简短的代码为列表中的每个文件夹启动一个(随机)进程:

var exec = require('child_process').exec;
var folders = [...]; // a list from somewhere

_.each(folders, function(folder) {
    exec("tar cvf " + folder + ".tgz " + folder);
});

如果列表很长,我可能会 运行 同时处理大量进程,这是要避免的。 运行 以受控速率(此处最多 5 个并发进程)执行的相当简单的方法是什么?

编辑:这个问题是针对每一种异步流(你想在其中控制速率),而不仅仅是文件夹执行问题。

使用async包及其功能:eachLimit

它的作用与 lodash 相同,但使用异步流处理并进行迭代以确保一次不会 运行 超出限制:

var async = require('async');
var exec = require('child_process').exec;
var folders = [...]; // a list from somewhere

var maxProcesses = 5; // 5 items at a time
async.eachLimit(
  folders, // collection
  maxProcesses, // limit
  function(folder, next) { // iterator function. args: item, callback
    var cmd = "tar -cf " + folder + ".tgz " + folder;
    console.log('calling:', cmd);
    exec(cmd, function(err, stdOut, stdErr) { // executing cmd
      if(err) console.error(err); // if error putting to console
      next(); // passing the async flow to handle the next iteration
    });
  },
  function() { // after all iterations finished
    console.log('finished processing commands');
  });

parallelLimit :

var async = require('async');
var _ = require('lodash');
var exec = require('child_process').exec;
var folders = [...]; // a list from somewhere

var callStack = [];
_.each(folders, function(folder) { // generating our stack of commands
  callStack.push(function(done) {
    var cmd = "tar -cf " + folder + ".tgz " + folder;
    exec(cmd, function(err, stdOut, stdErr) {
      if(err) console.error(err);
      done(null, folder);
    });
  });
});

var maxProcesses = 5; // 5 items at a time
async.parallelLimit(callStack, maxProcesses, function() {console.log('finished');});

"making it look shorter":)

const
  async = require('async'),
  exec = require('child_process').exec;

let folders = [...]; 
async.eachLimit(folders, 5, 
  (folder, next) => 
    exec("tar -cf " + folder + ".tgz " + folder, () => next()),
    () => console.log('finished'));

const
  async = require('async'),
  exec = require('child_process').exec;

let folders = [...]; 
let commands = folders.map(folder => done => exec("tar -cf " + folder + ".tgz " + folder, () => done());
async.parallelLimit(commands, 5, () => console.log('finished'));



如果这些示例中的任何一个不适合您,或者您的系统非常大,那么让我们尝试使用像 rsmq

这样的消息队列系统

原生 Javascript

您所需要的只是某种负载均衡器。像这样将循环放入单独的函数中:

  /**
  * Loops through your Folderarray and begins at the given index.
  * @param  {[type]} lastIndex       [last index which your loop stopped.]
  * @param  {[type]} maxProcesses    [maximum of processes you want to have.]
  * @param  {[type]} folderArray     [folder array.]
  */
  function loopFolders(maxProcesses, lastIndex, folderArray){

    // counter for our actual processes.
    let processes = 0;
    // This is to stop the loop, since JavaScript has no built-in break for loops.
    let maximumReached = false;

    // loop through array.
    folderArray.forEach(function(element, index, array){

      // Do stuff once you hit the last point.
      if(index > lastIndex && !maximumReached){

        // Check how many processes are running.
        if(processes <= maxProcesses){

          // create your child process.
          let exec = require('child_process').exec;
          // Do stuff with Folderelement from Folderarray.
          exec("tar cvf " + element + ".tgz " + element);

          // Increment processes.
          processes++;

        }else{
          /**
           * Magic begins here:
           * If max processes was reached, retry after a while.
           */

          // We are done for this loop.
           maximumReached = true;

           // Set timeout for 10 seconds and then retry.
          setTimeout(function(){
            // Calll function again.
            loopFolders(maxProcesses, index, array);
          }, 10000);
        }

      }

    });

  }

要从头开始调用此循环,您只需这样做:

// your Array of folders from somewhere.    
let folders = [...];
// Call loopFolders with maximum of 5 and the beginning index of 0.
loopFolders(5, 0, folders);

此代码是负载均衡器的一个非常基本的示例。请记住,我的示例永远不会知道是否完成了任何其他过程。您可以使用某种回调来确定。但这至少对你有帮助。

要使用 NodeJS Childprocess 事件,请查看 https://nodejs.org/api/child_process.html

您可以对 'exit' 事件中的循环进行回调,以确保您的子进程不会失控。

希望对您有所帮助。

此致, 巨人

承诺

我只是喜欢承诺,并且喜欢尽可能遵守承诺。

这是我认为适合您的情况的解决方案。

var exec = require('child_process').exec;
var folders = ["1", "2", "3", "4", "5", "6", "7", "8", "9", "10"];
var maxConcurrentProcessCount = 5;
var promiseArr = [];

folders.forEach(function (folder) {
    var pr = {
        start: function () {
            if (pr.promise) return pr.promise;
            return pr.promise = new Promise(function (resolve) {
                exec("tar cvf " + folder + ".tgz " + folder,
                  undefined, (err, stdout, stderr) => {
                      // This is your logic, you can reject depending on err
                      var ind = promiseArr.indexOf(pr);
                      if (ind >= 0) promiseArr.splice(ind, 1);
                      resolve(stdout);
                  });
            });
        }
    };
    promiseArr.push(pr);
});

var racePromises = function () {
    if (!promiseArr.length) return;
    Promise.race(promiseArr.slice(0, maxConcurrentProcessCount).map(x => x.start())).then(racePromises);
    console.log("Current running process count: " + promiseArr.filter(x => x.promise).length);
}
racePromises();

简短说明

创建一个数组,其中每个元素代表一个任务。首先 select 5 个并启动它们。每当其中一个完成时,将其从数组中删除并再次从数组中启动 5 个任务。

例子运行

用承诺重新创建 eachLimit 只是为了好玩

var myEachLimit = function (collection, maxConcurrentCalls, callback) {
    return new Promise(function (resolve, reject) {
        var promiseArr = [];

        collection.forEach(function (item) {
            var pr = {
                start: function () {
                    if (pr.promise) return pr.promise;
                    return pr.promise = new Promise(function (resolve) {
                        callback.call(item, item, function () {
                            var ind = promiseArr.indexOf(pr);
                            if (ind >= 0) promiseArr.splice(ind, 1);
                            resolve();
                        });

                    });
                }
            };
            promiseArr.push(pr);
        });

        var racePromises = function () {
            if (!promiseArr.length) {
                resolve();
                return;
            }
            Promise.race(promiseArr.slice(0, maxConcurrentProcessCount).map(x => x.start())).then(racePromises);
            console.log("Current running process count: " + promiseArr.filter(x => x.promise).length);
        }
        racePromises();
    });
}


// Demo

var exec = require('child_process').exec;
var folders = ["1", "2", "3", "4", "5", "6", "7", "8", "9", "10"];
var maxConcurrentProcessCount = 5;

myEachLimit(folders, maxConcurrentProcessCount, function (folder, next) {
    exec("tar cvf " + folder + ".tgz " + folder, (err, stdout, stderr) => {
        next();
    });
}).then(function () {
    console.log("Finished all processes");
});