nodejs- kafka-node 模块- 从每个分区获取相同的消息
nodejs- kafka-node module- getting the same msg from each partition
我正在使用节点模块 "kafka-node" 创建一个 kafa-consumer:
var client = new kafka.Client(ZOO_KEEPER_URL, PORTAL_CLIENT_ID);
var topics = [
{ "topic" : MY_TOPIC_NAME,
partition: 0
},
{ "topic" : MY_TOPIC_NAME,
partition: 1 }
];
var options = {
autoCommit: false,
groupId: GROUP_ID,
fromOffset : false
};
var consumer = new Consumer(client, topics, options);
consumer.on('message', function (message) {
console.log("New Message; offset: "+ message.offset + " Partition: "+message.partition);
});
发生的情况是从分区 0 中检索一次相同的偏移量,然后从分区 1 中检索一次。
我期望的是 ZooKeeper 将知道如何平衡和管理它。
注:
用例是从 Kafka 读取任何可用的消息 -> 写入 Elastic Search -> 提交给 Kafka(这就是自动提交设置为 false 的原因)
每个分区都有自己的偏移量,包含不同的数据。因此,如果您收到来自分区 0 的偏移量为零的消息和来自分区 1 的偏移量为零的消息,则这是两条不同的消息。同样在提交时,所有分区的偏移量都是独立存储的。
我正在使用节点模块 "kafka-node" 创建一个 kafa-consumer:
var client = new kafka.Client(ZOO_KEEPER_URL, PORTAL_CLIENT_ID);
var topics = [
{ "topic" : MY_TOPIC_NAME,
partition: 0
},
{ "topic" : MY_TOPIC_NAME,
partition: 1 }
];
var options = {
autoCommit: false,
groupId: GROUP_ID,
fromOffset : false
};
var consumer = new Consumer(client, topics, options);
consumer.on('message', function (message) {
console.log("New Message; offset: "+ message.offset + " Partition: "+message.partition);
});
发生的情况是从分区 0 中检索一次相同的偏移量,然后从分区 1 中检索一次。 我期望的是 ZooKeeper 将知道如何平衡和管理它。
注: 用例是从 Kafka 读取任何可用的消息 -> 写入 Elastic Search -> 提交给 Kafka(这就是自动提交设置为 false 的原因)
每个分区都有自己的偏移量,包含不同的数据。因此,如果您收到来自分区 0 的偏移量为零的消息和来自分区 1 的偏移量为零的消息,则这是两条不同的消息。同样在提交时,所有分区的偏移量都是独立存储的。