如何在 ConsumerGroup 中设置 Zookeeper IP 地址而不是 Kafka-Host IP 地址?

How to set Zookeeper IP address in ConsumerGroup instead of Kafka-Host IP address?

我必须在 ConsumerGroup 中设置 zookeeper IP 地址而不是 Kafka-Host IP 地址。因为我将复制因子设置为 3,所以创建了 3 个代理。因此,如果一台主机发生故障,那么另一台主机可以接管。

当我尝试在 ConsumerGroup 中放置 zookeeper IP 地址而不是 Kafka-Host IP 地址时,它没有收到任何从 Producer API 发送的消息。

var kafka = require('kafka-node')
var ConsumerGroup = kafka.ConsumerGroup

function createConsumerGroup () {
  var options = {
    kafkaHost: '127.0.0.1:9092',
    batch: undefined,
    ssl: true,
    groupId: 'demoExample',
    protocol: ['roundrobin'],
    encoding: 'utf8',
    fromOffset: 'latest',
    commitOffsetsOnFirstJoin: true,
    outOfRangeOffset: 'earliest',
    onRebalance: (isAlreadyMember, callback) => { callback(); }
  }

  var consumerGroup = new ConsumerGroup(Object.assign({ id: 'demo-' + process.pid }, options), 'example')

  consumerGroup.on('message', function (message) {
    message.value = JSON.parse(message.value)
    console.log('Message Received')
  })
}

我想要的是,如果我在 ConsumerGroup 中传递 zookeeper IP 地址而不是 Kafka-Host IP 地址,它应该接收从 Producer API 关于 "example" 主题发送的消息。如果一个代理失败,那么它应该从另一个代理接收消息。由于复制因子设置为 3,因此创建了 3 个代理。

Kafka 0.9 中引入的新消费者 API,不需要与 Zookeeper 连接。组平衡现在由 Kafka 自己处理。因此,您必须提供 Kafka 主机而不是 Zookeeper 主机。

blog post from Confluent 应该能说明更多问题:

At the time of the 0.8.2 release of Apache Kafka, which released the redesigned producer client, we had promised to redesign the consumer client as well. And we kept our promise: the 0.9 release introduces beta support for the newly redesigned consumer client. At a high level, the primary difference in the new consumer is that it removes the distinction between the “high-level” ZooKeeper-based consumer and the “low-level” SimpleConsumer APIs, and instead offers a unified consumer API.

This new consumer is implemented using a powerful new server-side facility that makes group management a first-class part of Kafka’s protocols. This has several advantages. First it allows for a much more scalable group facility, which allows the consumer clients to be much simpler and thinner and allows for larger groups with far faster rebalancing. This facility is available to all clients; work is already nearing completion to use it in the C client, librdkafka. This same facility turns out to be broadly useful for managing distributed producing and consuming of data in Kafka; it is the basis for Kafka Connect as well as several upcoming projects. Finally this completes a series of projects done in the last few years to fully decouple Kafka clients from Zookeeper, thus entirely removing the consumer client’s dependency on ZooKeeper. Zookeeper is still used by Kafka, but it is an implementation detail of the broker–clients that use this new facility have no need to connect to Zookeeper at all. This has a number of operational benefits since clients are now always working through the security and quota mechanisms the broker provides. This significantly simplifies the consumer and opens the door for first-class non-Java implementations of the consumer API to emerge over time.

好的,问题出在 consumerGroup 选项对象上。

我们必须在选项对象的 "host" 键中传递 zookeeper IP 地址而不是 "kafkaHost" 键。这样就解决了问题,收到了ProducerAPI发送的所有数据。如果一个副本集失败,甚至会自动切换到另一个副本集。

var options = {
    kafkaHost: '127.0.0.1:9092',
    batch: undefined,
    ssl: true,
    groupId: 'demoExample',
    protocol: ['roundrobin'],
    encoding: 'utf8',
    fromOffset: 'latest',
    commitOffsetsOnFirstJoin: true,
    outOfRangeOffset: 'earliest',
    onRebalance: (isAlreadyMember, callback) => { callback(); }
  }

以下代码块修复它。

var options = {
    host: '127.0.0.1:2181', // change in key & value
    batch: undefined,
    ssl: true,
    groupId: 'demoExample',
    protocol: ['roundrobin'],
    encoding: 'utf8',
    fromOffset: 'latest',
    commitOffsetsOnFirstJoin: true,
    outOfRangeOffset: 'earliest',
    onRebalance: (isAlreadyMember, callback) => { callback(); }
  }

谢谢。