控制 javascript 异步流的速率(在循环中)
Control the rate of a javascript asynchronous flow (in a loop)
假设您想用一个简短的代码为列表中的每个文件夹启动一个(随机)进程:
var exec = require('child_process').exec;
var folders = [...]; // a list from somewhere
_.each(folders, function(folder) {
exec("tar cvf " + folder + ".tgz " + folder);
});
如果列表很长,我可能会 运行 同时处理大量进程,这是要避免的。 运行 以受控速率(此处最多 5 个并发进程)执行的相当简单的方法是什么?
编辑:这个问题是针对每一种异步流(你想在其中控制速率),而不仅仅是文件夹执行问题。
使用async
包及其功能:eachLimit
它的作用与 lodash
相同,但使用异步流处理并进行迭代以确保一次不会 运行 超出限制:
var async = require('async');
var exec = require('child_process').exec;
var folders = [...]; // a list from somewhere
var maxProcesses = 5; // 5 items at a time
async.eachLimit(
folders, // collection
maxProcesses, // limit
function(folder, next) { // iterator function. args: item, callback
var cmd = "tar -cf " + folder + ".tgz " + folder;
console.log('calling:', cmd);
exec(cmd, function(err, stdOut, stdErr) { // executing cmd
if(err) console.error(err); // if error putting to console
next(); // passing the async flow to handle the next iteration
});
},
function() { // after all iterations finished
console.log('finished processing commands');
});
或 parallelLimit :
var async = require('async');
var _ = require('lodash');
var exec = require('child_process').exec;
var folders = [...]; // a list from somewhere
var callStack = [];
_.each(folders, function(folder) { // generating our stack of commands
callStack.push(function(done) {
var cmd = "tar -cf " + folder + ".tgz " + folder;
exec(cmd, function(err, stdOut, stdErr) {
if(err) console.error(err);
done(null, folder);
});
});
});
var maxProcesses = 5; // 5 items at a time
async.parallelLimit(callStack, maxProcesses, function() {console.log('finished');});
"making it look shorter":)
const
async = require('async'),
exec = require('child_process').exec;
let folders = [...];
async.eachLimit(folders, 5,
(folder, next) =>
exec("tar -cf " + folder + ".tgz " + folder, () => next()),
() => console.log('finished'));
和
const
async = require('async'),
exec = require('child_process').exec;
let folders = [...];
let commands = folders.map(folder => done => exec("tar -cf " + folder + ".tgz " + folder, () => done());
async.parallelLimit(commands, 5, () => console.log('finished'));
如果这些示例中的任何一个不适合您,或者您的系统非常大,那么让我们尝试使用像 rsmq
这样的消息队列系统
原生 Javascript
您所需要的只是某种负载均衡器。像这样将循环放入单独的函数中:
/**
* Loops through your Folderarray and begins at the given index.
* @param {[type]} lastIndex [last index which your loop stopped.]
* @param {[type]} maxProcesses [maximum of processes you want to have.]
* @param {[type]} folderArray [folder array.]
*/
function loopFolders(maxProcesses, lastIndex, folderArray){
// counter for our actual processes.
let processes = 0;
// This is to stop the loop, since JavaScript has no built-in break for loops.
let maximumReached = false;
// loop through array.
folderArray.forEach(function(element, index, array){
// Do stuff once you hit the last point.
if(index > lastIndex && !maximumReached){
// Check how many processes are running.
if(processes <= maxProcesses){
// create your child process.
let exec = require('child_process').exec;
// Do stuff with Folderelement from Folderarray.
exec("tar cvf " + element + ".tgz " + element);
// Increment processes.
processes++;
}else{
/**
* Magic begins here:
* If max processes was reached, retry after a while.
*/
// We are done for this loop.
maximumReached = true;
// Set timeout for 10 seconds and then retry.
setTimeout(function(){
// Calll function again.
loopFolders(maxProcesses, index, array);
}, 10000);
}
}
});
}
要从头开始调用此循环,您只需这样做:
// your Array of folders from somewhere.
let folders = [...];
// Call loopFolders with maximum of 5 and the beginning index of 0.
loopFolders(5, 0, folders);
此代码是负载均衡器的一个非常基本的示例。请记住,我的示例永远不会知道是否完成了任何其他过程。您可以使用某种回调来确定。但这至少对你有帮助。
要使用 NodeJS Childprocess 事件,请查看 https://nodejs.org/api/child_process.html
您可以对 'exit' 事件中的循环进行回调,以确保您的子进程不会失控。
希望对您有所帮助。
此致,
巨人
承诺
我只是喜欢承诺,并且喜欢尽可能遵守承诺。
这是我认为适合您的情况的解决方案。
var exec = require('child_process').exec;
var folders = ["1", "2", "3", "4", "5", "6", "7", "8", "9", "10"];
var maxConcurrentProcessCount = 5;
var promiseArr = [];
folders.forEach(function (folder) {
var pr = {
start: function () {
if (pr.promise) return pr.promise;
return pr.promise = new Promise(function (resolve) {
exec("tar cvf " + folder + ".tgz " + folder,
undefined, (err, stdout, stderr) => {
// This is your logic, you can reject depending on err
var ind = promiseArr.indexOf(pr);
if (ind >= 0) promiseArr.splice(ind, 1);
resolve(stdout);
});
});
}
};
promiseArr.push(pr);
});
var racePromises = function () {
if (!promiseArr.length) return;
Promise.race(promiseArr.slice(0, maxConcurrentProcessCount).map(x => x.start())).then(racePromises);
console.log("Current running process count: " + promiseArr.filter(x => x.promise).length);
}
racePromises();
简短说明
创建一个数组,其中每个元素代表一个任务。首先 select 5 个并启动它们。每当其中一个完成时,将其从数组中删除并再次从数组中启动 5 个任务。
例子运行
用承诺重新创建 eachLimit
只是为了好玩
var myEachLimit = function (collection, maxConcurrentCalls, callback) {
return new Promise(function (resolve, reject) {
var promiseArr = [];
collection.forEach(function (item) {
var pr = {
start: function () {
if (pr.promise) return pr.promise;
return pr.promise = new Promise(function (resolve) {
callback.call(item, item, function () {
var ind = promiseArr.indexOf(pr);
if (ind >= 0) promiseArr.splice(ind, 1);
resolve();
});
});
}
};
promiseArr.push(pr);
});
var racePromises = function () {
if (!promiseArr.length) {
resolve();
return;
}
Promise.race(promiseArr.slice(0, maxConcurrentProcessCount).map(x => x.start())).then(racePromises);
console.log("Current running process count: " + promiseArr.filter(x => x.promise).length);
}
racePromises();
});
}
// Demo
var exec = require('child_process').exec;
var folders = ["1", "2", "3", "4", "5", "6", "7", "8", "9", "10"];
var maxConcurrentProcessCount = 5;
myEachLimit(folders, maxConcurrentProcessCount, function (folder, next) {
exec("tar cvf " + folder + ".tgz " + folder, (err, stdout, stderr) => {
next();
});
}).then(function () {
console.log("Finished all processes");
});
假设您想用一个简短的代码为列表中的每个文件夹启动一个(随机)进程:
var exec = require('child_process').exec;
var folders = [...]; // a list from somewhere
_.each(folders, function(folder) {
exec("tar cvf " + folder + ".tgz " + folder);
});
如果列表很长,我可能会 运行 同时处理大量进程,这是要避免的。 运行 以受控速率(此处最多 5 个并发进程)执行的相当简单的方法是什么?
编辑:这个问题是针对每一种异步流(你想在其中控制速率),而不仅仅是文件夹执行问题。
使用async
包及其功能:eachLimit
它的作用与 lodash
相同,但使用异步流处理并进行迭代以确保一次不会 运行 超出限制:
var async = require('async');
var exec = require('child_process').exec;
var folders = [...]; // a list from somewhere
var maxProcesses = 5; // 5 items at a time
async.eachLimit(
folders, // collection
maxProcesses, // limit
function(folder, next) { // iterator function. args: item, callback
var cmd = "tar -cf " + folder + ".tgz " + folder;
console.log('calling:', cmd);
exec(cmd, function(err, stdOut, stdErr) { // executing cmd
if(err) console.error(err); // if error putting to console
next(); // passing the async flow to handle the next iteration
});
},
function() { // after all iterations finished
console.log('finished processing commands');
});
或 parallelLimit :
var async = require('async');
var _ = require('lodash');
var exec = require('child_process').exec;
var folders = [...]; // a list from somewhere
var callStack = [];
_.each(folders, function(folder) { // generating our stack of commands
callStack.push(function(done) {
var cmd = "tar -cf " + folder + ".tgz " + folder;
exec(cmd, function(err, stdOut, stdErr) {
if(err) console.error(err);
done(null, folder);
});
});
});
var maxProcesses = 5; // 5 items at a time
async.parallelLimit(callStack, maxProcesses, function() {console.log('finished');});
"making it look shorter":)
const
async = require('async'),
exec = require('child_process').exec;
let folders = [...];
async.eachLimit(folders, 5,
(folder, next) =>
exec("tar -cf " + folder + ".tgz " + folder, () => next()),
() => console.log('finished'));
和
const
async = require('async'),
exec = require('child_process').exec;
let folders = [...];
let commands = folders.map(folder => done => exec("tar -cf " + folder + ".tgz " + folder, () => done());
async.parallelLimit(commands, 5, () => console.log('finished'));
如果这些示例中的任何一个不适合您,或者您的系统非常大,那么让我们尝试使用像 rsmq
原生 Javascript
您所需要的只是某种负载均衡器。像这样将循环放入单独的函数中:
/**
* Loops through your Folderarray and begins at the given index.
* @param {[type]} lastIndex [last index which your loop stopped.]
* @param {[type]} maxProcesses [maximum of processes you want to have.]
* @param {[type]} folderArray [folder array.]
*/
function loopFolders(maxProcesses, lastIndex, folderArray){
// counter for our actual processes.
let processes = 0;
// This is to stop the loop, since JavaScript has no built-in break for loops.
let maximumReached = false;
// loop through array.
folderArray.forEach(function(element, index, array){
// Do stuff once you hit the last point.
if(index > lastIndex && !maximumReached){
// Check how many processes are running.
if(processes <= maxProcesses){
// create your child process.
let exec = require('child_process').exec;
// Do stuff with Folderelement from Folderarray.
exec("tar cvf " + element + ".tgz " + element);
// Increment processes.
processes++;
}else{
/**
* Magic begins here:
* If max processes was reached, retry after a while.
*/
// We are done for this loop.
maximumReached = true;
// Set timeout for 10 seconds and then retry.
setTimeout(function(){
// Calll function again.
loopFolders(maxProcesses, index, array);
}, 10000);
}
}
});
}
要从头开始调用此循环,您只需这样做:
// your Array of folders from somewhere.
let folders = [...];
// Call loopFolders with maximum of 5 and the beginning index of 0.
loopFolders(5, 0, folders);
此代码是负载均衡器的一个非常基本的示例。请记住,我的示例永远不会知道是否完成了任何其他过程。您可以使用某种回调来确定。但这至少对你有帮助。
要使用 NodeJS Childprocess 事件,请查看 https://nodejs.org/api/child_process.html
您可以对 'exit' 事件中的循环进行回调,以确保您的子进程不会失控。
希望对您有所帮助。
此致, 巨人
承诺
我只是喜欢承诺,并且喜欢尽可能遵守承诺。
这是我认为适合您的情况的解决方案。
var exec = require('child_process').exec;
var folders = ["1", "2", "3", "4", "5", "6", "7", "8", "9", "10"];
var maxConcurrentProcessCount = 5;
var promiseArr = [];
folders.forEach(function (folder) {
var pr = {
start: function () {
if (pr.promise) return pr.promise;
return pr.promise = new Promise(function (resolve) {
exec("tar cvf " + folder + ".tgz " + folder,
undefined, (err, stdout, stderr) => {
// This is your logic, you can reject depending on err
var ind = promiseArr.indexOf(pr);
if (ind >= 0) promiseArr.splice(ind, 1);
resolve(stdout);
});
});
}
};
promiseArr.push(pr);
});
var racePromises = function () {
if (!promiseArr.length) return;
Promise.race(promiseArr.slice(0, maxConcurrentProcessCount).map(x => x.start())).then(racePromises);
console.log("Current running process count: " + promiseArr.filter(x => x.promise).length);
}
racePromises();
简短说明
创建一个数组,其中每个元素代表一个任务。首先 select 5 个并启动它们。每当其中一个完成时,将其从数组中删除并再次从数组中启动 5 个任务。
例子运行
用承诺重新创建 eachLimit
只是为了好玩
var myEachLimit = function (collection, maxConcurrentCalls, callback) {
return new Promise(function (resolve, reject) {
var promiseArr = [];
collection.forEach(function (item) {
var pr = {
start: function () {
if (pr.promise) return pr.promise;
return pr.promise = new Promise(function (resolve) {
callback.call(item, item, function () {
var ind = promiseArr.indexOf(pr);
if (ind >= 0) promiseArr.splice(ind, 1);
resolve();
});
});
}
};
promiseArr.push(pr);
});
var racePromises = function () {
if (!promiseArr.length) {
resolve();
return;
}
Promise.race(promiseArr.slice(0, maxConcurrentProcessCount).map(x => x.start())).then(racePromises);
console.log("Current running process count: " + promiseArr.filter(x => x.promise).length);
}
racePromises();
});
}
// Demo
var exec = require('child_process').exec;
var folders = ["1", "2", "3", "4", "5", "6", "7", "8", "9", "10"];
var maxConcurrentProcessCount = 5;
myEachLimit(folders, maxConcurrentProcessCount, function (folder, next) {
exec("tar cvf " + folder + ".tgz " + folder, (err, stdout, stderr) => {
next();
});
}).then(function () {
console.log("Finished all processes");
});