提高从 ActiveMQ 插入 Mongo 的性能
Improving performance of inserting into Mongo from ActiveMQ
以下代码的基本思想是我从 ActiveMQ Artemis 安装中读取消息并将它们插入到 MongoDB 实例中。
它每秒处理多达一百条左右的消息时运行良好,但如果我向它发送几千条消息,它就会崩溃。我的第一个猜测是不断打开和关闭数据库连接。我还应该考虑使用内存存储并进行批量数据库插入吗?
代码全部 运行 在节点中使用 mqtt 和 mongodb npm 包。下面的代码,数据库和队列都是 运行 在 docker 容器中,如果有什么区别的话。
var mqtt = require('mqtt'),
client = mqtt.connect('mqtt://mq:1883', {
username: "*************",
password: "*************"
}),
MongoClient = require('mongodb').MongoClient,
ObjectId = require('mongodb').ObjectID,
assert = require('assert'),
url = 'mongodb://db:27017/uo-readings';
client.on('connect', function () {
client.subscribe('readings');
});
client.on('error', function(error){
console.log(error)
});
client.on('message', function (topic, message) {
console.log(message.toString());
MongoClient.connect(url, function(err, db) {
assert.equal(null, err);
console.log("Connected correctly to server.");
db.collection('readings').insertOne(JSON.parse(message.toString()), function(err, result) {
assert.equal(err, null);
console.log("Inserted a document into the readings collection.");
});
client.end(function(){
console.log("Closing Connection.");
db.close();
});
});
});
参见上面@Jonathan Muller 的评论
以下代码的基本思想是我从 ActiveMQ Artemis 安装中读取消息并将它们插入到 MongoDB 实例中。
它每秒处理多达一百条左右的消息时运行良好,但如果我向它发送几千条消息,它就会崩溃。我的第一个猜测是不断打开和关闭数据库连接。我还应该考虑使用内存存储并进行批量数据库插入吗?
代码全部 运行 在节点中使用 mqtt 和 mongodb npm 包。下面的代码,数据库和队列都是 运行 在 docker 容器中,如果有什么区别的话。
var mqtt = require('mqtt'),
client = mqtt.connect('mqtt://mq:1883', {
username: "*************",
password: "*************"
}),
MongoClient = require('mongodb').MongoClient,
ObjectId = require('mongodb').ObjectID,
assert = require('assert'),
url = 'mongodb://db:27017/uo-readings';
client.on('connect', function () {
client.subscribe('readings');
});
client.on('error', function(error){
console.log(error)
});
client.on('message', function (topic, message) {
console.log(message.toString());
MongoClient.connect(url, function(err, db) {
assert.equal(null, err);
console.log("Connected correctly to server.");
db.collection('readings').insertOne(JSON.parse(message.toString()), function(err, result) {
assert.equal(err, null);
console.log("Inserted a document into the readings collection.");
});
client.end(function(){
console.log("Closing Connection.");
db.close();
});
});
});
参见上面@Jonathan Muller 的评论