将消息从 kafka 消费者推送到 mongodb
push messages from kafka consumer to mongodb
我在
事件中使用 'kafka-node' 创建了 kafka 消费者
consumer.on('message' ()=>{
connecting to mongodb and inserting to a collection.
})
mongo.js 文件用于创建与 mongo 和 return 对象的连接
const MongoClient = require('mongodb').MongoClient, assert = require('assert');
const url = 'mongodb://root:****@ds031257.mlab.com:31257/kafka-node';
let _db;
const connectDB = (callback) => {
try {
MongoClient.connect(url, { useNewUrlParser: true }, (err, database) => {
console.log('message' + database)
_db = database.db('kafka-node');
return callback(err);
})
} catch (e) {
throw e;
}
}
const getDB = () => _db;
const close = () => _db.close();
module.exports = { connectDB, getDB, close }
consumer.js是创建消费者并将消息推送到mongodb
let kafka = require('kafka-node');
let MongoDB = require('./mongo');
let Consumer = kafka.Consumer,
// The client specifies the ip of the Kafka producer and uses
// the zookeeper port 2181
client = new kafka.KafkaClient({ kafkaHost: 'localhost:9093, localhost:9094, localhost:9095' });
// The consumer object specifies the client and topic(s) it subscribes to
consumer = new Consumer(
client, [{ topic: 'infraTopic', partitions: 3 }], { autoCommit: false });
consumer.on('ready', function () {
console.log('consumer is ready');
});
consumer.on('error', function (err) {
console.log('consumer is in error state');
console.log(err);
})
client.refreshMetadata(['infraTopic'], (err) => {
if (err) {
console.warn('Error refreshing kafka metadata', err);
}
});
consumer.on('message', function (message) {
// grab the main content from the Kafka message
console.log(message);
MongoDB.connectDB((err) => {
if (err) throw err
// Load db & collections
const db = MongoDB.getDB();
const collectionKafka = db.collection('sampleCollection');
try {
collectionKafka.insertOne(
{
timestamp: message.value,
topic: message.topic
},
function (err, res) {
if (err) {
database.close();
return console.log(err);
}
// Success
}
)
} catch (e) {
throw e
}
})
});
这是将消息从 kafka 消费者推送到 mongodb 的正确方法吗?
使用此设置,它会一直工作,直到写入所有消息,一旦到达 EOL,它就会抛出 "Cannot read property 'db' of null"
is this the right way to push messages to mongodb from a kafka consumer?
我想这是一种方式,但我不会称它为正确方式:)
更好的是使用 Kafka Connect。它是 Apache Kafka 的一部分,它旨在完全按照您的意愿进行操作 - 将数据从 Kafka 流式传输到目标系统(您也可以使用它来将数据从其他系统 流式传输到 卡夫卡)。
有一个 excellent connector for MongoDB with comprehensive documentation 可以完全满足您的要求。
如果您需要在写入数据之前处理数据,那么要遵循的模式是使用 Kafka Streams、KSQL 或您想要使用的任何处理工具进行处理——但将其写回 回到 Kafka 主题。该主题随后由 Kafka Connect 读取并流式传输到您的目标。这样你就可以分离责任,并构建一个更简单但更具弹性和可扩展性的系统。
我在
事件中使用 'kafka-node' 创建了 kafka 消费者consumer.on('message' ()=>{
connecting to mongodb and inserting to a collection.
})
mongo.js 文件用于创建与 mongo 和 return 对象的连接
const MongoClient = require('mongodb').MongoClient, assert = require('assert');
const url = 'mongodb://root:****@ds031257.mlab.com:31257/kafka-node';
let _db;
const connectDB = (callback) => {
try {
MongoClient.connect(url, { useNewUrlParser: true }, (err, database) => {
console.log('message' + database)
_db = database.db('kafka-node');
return callback(err);
})
} catch (e) {
throw e;
}
}
const getDB = () => _db;
const close = () => _db.close();
module.exports = { connectDB, getDB, close }
consumer.js是创建消费者并将消息推送到mongodb
let kafka = require('kafka-node');
let MongoDB = require('./mongo');
let Consumer = kafka.Consumer,
// The client specifies the ip of the Kafka producer and uses
// the zookeeper port 2181
client = new kafka.KafkaClient({ kafkaHost: 'localhost:9093, localhost:9094, localhost:9095' });
// The consumer object specifies the client and topic(s) it subscribes to
consumer = new Consumer(
client, [{ topic: 'infraTopic', partitions: 3 }], { autoCommit: false });
consumer.on('ready', function () {
console.log('consumer is ready');
});
consumer.on('error', function (err) {
console.log('consumer is in error state');
console.log(err);
})
client.refreshMetadata(['infraTopic'], (err) => {
if (err) {
console.warn('Error refreshing kafka metadata', err);
}
});
consumer.on('message', function (message) {
// grab the main content from the Kafka message
console.log(message);
MongoDB.connectDB((err) => {
if (err) throw err
// Load db & collections
const db = MongoDB.getDB();
const collectionKafka = db.collection('sampleCollection');
try {
collectionKafka.insertOne(
{
timestamp: message.value,
topic: message.topic
},
function (err, res) {
if (err) {
database.close();
return console.log(err);
}
// Success
}
)
} catch (e) {
throw e
}
})
});
这是将消息从 kafka 消费者推送到 mongodb 的正确方法吗? 使用此设置,它会一直工作,直到写入所有消息,一旦到达 EOL,它就会抛出 "Cannot read property 'db' of null"
is this the right way to push messages to mongodb from a kafka consumer?
我想这是一种方式,但我不会称它为正确方式:)
更好的是使用 Kafka Connect。它是 Apache Kafka 的一部分,它旨在完全按照您的意愿进行操作 - 将数据从 Kafka 流式传输到目标系统(您也可以使用它来将数据从其他系统 流式传输到 卡夫卡)。
有一个 excellent connector for MongoDB with comprehensive documentation 可以完全满足您的要求。
如果您需要在写入数据之前处理数据,那么要遵循的模式是使用 Kafka Streams、KSQL 或您想要使用的任何处理工具进行处理——但将其写回 回到 Kafka 主题。该主题随后由 Kafka Connect 读取并流式传输到您的目标。这样你就可以分离责任,并构建一个更简单但更具弹性和可扩展性的系统。