使用 Node.js 将大型对象数组异步写入 Redis
Asynchronously Write Large Array of Objects to Redis with Node.js
我创建了一个 Node.js 脚本来创建大量随机生成的测试数据,我想将其写入 Redis 数据库。我正在使用 redis client library and the async 库。最初,我尝试在生成测试数据的 for
循环中执行 redisClient.hset(...)
命令,但经过一些谷歌搜索后,我了解到 Redis 方法是异步的,而 for
循环是同步的。在 Whosebug 上看到一些问题后,我无法让它按我想要的方式工作。
我可以毫无问题地向 Redis 写入小数组或更大的数组,例如包含 100,000 个项目的数组。但是,当我有一个包含 5,000,000 个项目的数组时,它无法正常工作。我最终没有足够的内存,因为 redis 命令似乎在排队,但直到 async.each(...)
完成并且节点进程不退出后才执行。如何让 Redis 客户端实际执行命令,正如我所说的 redisClient.hset(...)
?
这是我正在使用的代码片段。
var redis = require('redis');
var async = require('async');
var redisClient = redis.createClient(6379, '192.168.1.150');
var testData = generateTestData();
async.each(testData, function(item, callback) {
var someData = JSON.stringify(item.data);
redisClient.hset('item:'+item.key, 'hashKey', someData, function(err, reply) {
console.log("Item was persisted. Result: " +reply);
});
callback();
}, function(err) {
if (err) {
console.error(err);
} else {
console.log.info("Items have been persisted to Redis.");
}
});
您可以调用 eachLimit 以确保不会同时执行太多 redisClient.hset 调用。
为避免调用堆栈溢出,您可以 setTimeout(callback, 0);
而不是直接调用回调。
编辑:
忘了我说的setTimeout。您需要做的就是在正确的位置调用回调。像这样:
redisClient.hset('item:'+item.key, 'hashKey', someData, function(err, reply) {
console.log("Item was persisted. Result: " +reply);
callback();
});
您可能仍想使用 eachLimit 并尝试哪个限制最有效。
顺便说一句 - async.each 应该只用于在 javascript 事件队列(例如计时器、网络等)中安排回调调用的代码。切勿像原始代码那样在立即调用回调的代码上使用它。
编辑:
您可以实现自己的 eachLimit 函数,而不是数组,将生成器作为第一个参数。然后编写一个生成器函数来创建测试数据。为此,节点需要 运行 和 "node --harmony code.js".
function eachLimit(generator, limit, iterator, callback) {
var isError = false, j;
function startNextSetOfActions() {
var elems = [];
for(var i = 0; i < limit; i++) {
j = generator.next();
if(j.done) break;
elems.push(j.value);
}
var activeActions = elems.length;
if(activeActions === 0) {
callback(null);
}
elems.forEach(function(elem) {
iterator(elem, function(err) {
if(isError) return;
else if(err) {
callback(err);
isError = true;
return;
}
activeActions--;
if(activeActions === 0) startNextSetOfActions();
});
});
}
startNextSetOfActions();
}
function* testData() {
while(...) {
yield new Data(...);
}
}
eachLimit(testData(), 10, function(item, callback) {
var someData = JSON.stringify(item.data);
redisClient.hset('item:'+item.key, 'hashKey', someData, function(err, reply) {
if(err) callback(err);
else {
console.log("Item was persisted. Result: " +reply);
callback();
}
});
}, function(err) {
if (err) {
console.error(err);
} else {
console.log.info("Items have been persisted to Redis.");
}
});
我创建了一个 Node.js 脚本来创建大量随机生成的测试数据,我想将其写入 Redis 数据库。我正在使用 redis client library and the async 库。最初,我尝试在生成测试数据的 for
循环中执行 redisClient.hset(...)
命令,但经过一些谷歌搜索后,我了解到 Redis 方法是异步的,而 for
循环是同步的。在 Whosebug 上看到一些问题后,我无法让它按我想要的方式工作。
我可以毫无问题地向 Redis 写入小数组或更大的数组,例如包含 100,000 个项目的数组。但是,当我有一个包含 5,000,000 个项目的数组时,它无法正常工作。我最终没有足够的内存,因为 redis 命令似乎在排队,但直到 async.each(...)
完成并且节点进程不退出后才执行。如何让 Redis 客户端实际执行命令,正如我所说的 redisClient.hset(...)
?
这是我正在使用的代码片段。
var redis = require('redis');
var async = require('async');
var redisClient = redis.createClient(6379, '192.168.1.150');
var testData = generateTestData();
async.each(testData, function(item, callback) {
var someData = JSON.stringify(item.data);
redisClient.hset('item:'+item.key, 'hashKey', someData, function(err, reply) {
console.log("Item was persisted. Result: " +reply);
});
callback();
}, function(err) {
if (err) {
console.error(err);
} else {
console.log.info("Items have been persisted to Redis.");
}
});
您可以调用 eachLimit 以确保不会同时执行太多 redisClient.hset 调用。
为避免调用堆栈溢出,您可以 setTimeout(callback, 0);
而不是直接调用回调。
编辑:
忘了我说的setTimeout。您需要做的就是在正确的位置调用回调。像这样:
redisClient.hset('item:'+item.key, 'hashKey', someData, function(err, reply) {
console.log("Item was persisted. Result: " +reply);
callback();
});
您可能仍想使用 eachLimit 并尝试哪个限制最有效。
顺便说一句 - async.each 应该只用于在 javascript 事件队列(例如计时器、网络等)中安排回调调用的代码。切勿像原始代码那样在立即调用回调的代码上使用它。
编辑:
您可以实现自己的 eachLimit 函数,而不是数组,将生成器作为第一个参数。然后编写一个生成器函数来创建测试数据。为此,节点需要 运行 和 "node --harmony code.js".
function eachLimit(generator, limit, iterator, callback) {
var isError = false, j;
function startNextSetOfActions() {
var elems = [];
for(var i = 0; i < limit; i++) {
j = generator.next();
if(j.done) break;
elems.push(j.value);
}
var activeActions = elems.length;
if(activeActions === 0) {
callback(null);
}
elems.forEach(function(elem) {
iterator(elem, function(err) {
if(isError) return;
else if(err) {
callback(err);
isError = true;
return;
}
activeActions--;
if(activeActions === 0) startNextSetOfActions();
});
});
}
startNextSetOfActions();
}
function* testData() {
while(...) {
yield new Data(...);
}
}
eachLimit(testData(), 10, function(item, callback) {
var someData = JSON.stringify(item.data);
redisClient.hset('item:'+item.key, 'hashKey', someData, function(err, reply) {
if(err) callback(err);
else {
console.log("Item was persisted. Result: " +reply);
callback();
}
});
}, function(err) {
if (err) {
console.error(err);
} else {
console.log.info("Items have been persisted to Redis.");
}
});