使用 Node.js + Kafka 为消费者组创建分布式消费者

Create distributed consumers for a consumer group using Node.js + Kafka

我正在使用 Node.js 10+Apache Kafka 2.3no-kafka npm 包。

目前我创建了一个主题,复制因子为 3,分区为 3。我在 3 个不同的端口上有 3 个 kafka 代理。

使用 no-kafka,我可以看到根据分区数创建了 3 个消费者,并且都在同一台机器上。下面是代码和 运行ning 模型的快照。

代码:

var Kafka = require('no-kafka');
var Promise = require('bluebird');
var consumer = new Kafka.GroupConsumer({
    connectionString: 'kafka://192.168.1.172:9092, kafka://192.168.1.172:9093, kafka://192.168.1.172:9094'
  });

var dataHandler = function (messageSet, topic, partition) {
    return Promise.each(messageSet, function (m){
        console.log("Topic: " + topic, ", Partition: " + partition, ", Offset: " + m.offset, 
            ", Message: " + m.message.value.toString('utf8'));
        return consumer.commitOffset({topic: topic, partition: partition, offset: m.offset, metadata: 'optional'});
    });
};

var strategies = [{
    subscriptions: ['test'],
    handler: dataHandler
}];

consumer.init(strategies);

当我创建生产者并运行这个时,我在控制台上得到以下输出。

emgda@ubuntu:~/softwares/kafka_2.12-2.3.0$ ./bin/kafka-console-producer.sh --broker-list 192.168.1.172:9092 --topic test
>hey
>there
>how are you
>I am
>fine
>and
>how
>about
>you

下面是消费者的输出。

PS D:\checkout\javascript\sample projects\kafka> node .\consumer.js
2019-12-23T15:43:07.822Z INFO no-kafka-client Joined group no-kafka-group-v0.9 generationId 45 as no-kafka-client-c916ce86-3808-4a59-a257-bdb4a04e8ff7
2019-12-23T15:43:07.822Z INFO no-kafka-client Elected as group leader
2019-12-23T15:43:07.839Z DEBUG no-kafka-client Subscribed to test:0 offset 57 leader 192.168.1.172:9094
2019-12-23T15:43:07.840Z DEBUG no-kafka-client Subscribed to test:1 offset 56 leader 192.168.1.172:9094
2019-12-23T15:43:07.841Z DEBUG no-kafka-client Subscribed to test:2 offset 58 leader 192.168.1.172:9094
Topic: test , Partition: 2 , Offset: 58 , Message: hey
Topic: test , Partition: 1 , Offset: 56 , Message: there
Topic: test , Partition: 0 , Offset: 57 , Message: how are you
Topic: test , Partition: 1 , Offset: 57 , Message: fine
Topic: test , Partition: 2 , Offset: 59 , Message: I am
Topic: test , Partition: 0 , Offset: 58 , Message: and
Topic: test , Partition: 2 , Offset: 60 , Message: how
Topic: test , Partition: 0 , Offset: 59 , Message: you
Topic: test , Partition: 1 , Offset: 58 , Message: about

一切正常,就像是针对 1 个消费者一样,

1.自动创建1个消费者

2. 消息以循环方式在主题分区中分配

3. 消费者均匀分布在 3 个分区以实现负载平衡,但在同一台机器上。

当我使用kafka提供的脚本调查消费者组状态时,控制台输出如下。

emgda@ubuntu:~/softwares/kafka_2.12-2.3.0$ ./bin/kafka-consumer-groups.sh --describe --group  no-kafka-group-v0.9 --bootstrap-server 192.168.1.172:9093

GROUP               TOPIC           PARTITION  CURRENT-OFFSET  LOG-END-OFFSET  LAG             CONSUMER-ID                                          HOST            CLIENT-ID
no-kafka-group-v0.9 test            0          60              60              0               no-kafka-client-c916ce86-3808-4a59-a257-bdb4a04e8ff7 /192.168.1.48   no-kafka-client
no-kafka-group-v0.9 test            1          59              59              0               no-kafka-client-c916ce86-3808-4a59-a257-bdb4a04e8ff7 /192.168.1.48   no-kafka-client
no-kafka-group-v0.9 test            2          61              61              0               no-kafka-client-c916ce86-3808-4a59-a257-bdb4a04e8ff7 /192.168.1.48   no-kafka-client

问题:

  1. 唯一的问题是消费者在同一台机器上。我希望它分布在不同的机器上以实现负载平衡并跨硬件利用适当的资源。
  2. 有办法实现吗?

Note: I am restricted to use Node.js

我使用 kafkajs npm 包解决了这个问题。

NOTE: Refer above in question to connect a producer via console.

代码:

const { Kafka } = require('kafkajs')

const kafka = new Kafka({
  clientId: 'my-app',
  brokers: ['192.168.1.172:9092', '192.168.1.172:9093', '192.168.1.172:9094']
})

const consumer = kafka.consumer({ groupId: 'test-group' })

const run = async () => {

  await consumer.connect()
  await consumer.subscribe({ topic: 'test', fromBeginning: true })

  await consumer.run({
    eachMessage: async ({ topic, partition, message }) => {
      console.log({
        partition,
        offset: message.offset,
        value: message.value.toString(),
      })
    },
  })
}

run().catch(console.error)

单终端:

PS D:\checkout\javascript\sample projects\kafka> node .\kafkajs-impl.js
{"level":"ERROR","timestamp":"2019-12-24T03:19:02.613Z","logger":"kafkajs","message":"[Connection] Connection timeout","broker":"192.168.1.172:9092","clientId":"my-app"}
{"level":"ERROR","timestamp":"2019-12-24T03:19:02.625Z","logger":"kafkajs","message":"[BrokerPool] Failed to connect to seed broker, trying another broker from the list: Connection timeout","retryCount":0,"retryTime":270}
{"level":"INFO","timestamp":"2019-12-24T03:19:02.964Z","logger":"kafkajs","message":"[Consumer] Starting","groupId":"test-group"}
{"level":"INFO","timestamp":"2019-12-24T03:19:03.015Z","logger":"kafkajs","message":"[Runner] Consumer has joined the group","groupId":"test-group","memberId":"my-app-b1fec592-44ed-4ab7-bee3-5ed1850506aa","leaderId":"my-app-b1fec592-44ed-4ab7-bee3-5ed1850506aa","isLeader":true,"memberAssignment":{"test":[0,1,2]},"groupProtocol":"RoundRobinAssigner","duration":48}
{"level":"ERROR","timestamp":"2019-12-24T03:19:03.620Z","logger":"kafkajs","message":"[Connection] Connection error: connect ECONNREFUSED 192.168.1.172:9092","broker":"192.168.1.172:9092","clientId":"my-app","stack":"Error: connect ECONNREFUSED 192.168.1.172:9092\n    at TCPConnectWrap.afterConnect [as oncomplete] (net.js:1191:14)"}
{ partition: 0, offset: '107', value: 'fgh' }
{ partition: 2, offset: '109', value: '' }
{ partition: 1, offset: '108', value: 'asdsa' }

2个同时终端:

当我打开另一个终端并运行在新打开的终端上执行相同的命令时,我得到控制台输出如下

PS D:\checkout\javascript\sample projects\kafka> node .\kafkajs-impl.js
{"level":"ERROR","timestamp":"2019-12-24T03:22:21.229Z","logger":"kafkajs","message":"[Connection] Connection timeout","broker":"192.168.1.172:9092","clientId":"my-app"}
{"level":"ERROR","timestamp":"2019-12-24T03:22:21.236Z","logger":"kafkajs","message":"[BrokerPool] Failed to connect to seed broker, trying another broker from the list: Connection timeout","retryCount":0,"retryTime":257}
{"level":"INFO","timestamp":"2019-12-24T03:22:21.530Z","logger":"kafkajs","message":"[Consumer] Starting","groupId":"test-group"}
{"level":"ERROR","timestamp":"2019-12-24T03:22:22.236Z","logger":"kafkajs","message":"[Connection] Connection error: connect ECONNREFUSED 192.168.1.172:9092","broker":"192.168.1.172:9092","clientId":"my-app","stack":"Error: connect ECONNREFUSED 192.168.1.172:9092\n    at TCPConnectWrap.afterConnect [as oncomplete] (net.js:1191:14)"}
{"level":"INFO","timestamp":"2019-12-24T03:22:26.026Z","logger":"kafkajs","message":"[Runner] Consumer has joined the group","groupId":"test-group","memberId":"my-app-5d944889-d00b-48f8-91ea-ab0e0c4b6d13","leaderId":"my-app-b1fec592-44ed-4ab7-bee3-5ed1850506aa","isLeader":false,"memberAssignment":{"test":[0,2]},"groupProtocol":"RoundRobinAssigner","duration":4495}

现在,由于新消费者是通过第二个终端添加的,因此第一个消费者会在控制台上使用以下日志进行通知。

{"level":"INFO","timestamp":"2019-12-24T03:22:26.023Z","logger":"kafkajs","message":"[Runner] Consumer has joined the group","groupId":"test-group","memberId":"my-app-b1fec592-44ed-4ab7-bee3-5ed1850506aa","leaderId":"my-app-b1fec592-44ed-4ab7-bee3-5ed1850506aa","isLeader":true,"memberAssignment":{"test":[1]},"groupProtocol":"RoundRobinAssigner","duration":39}

3个同时终端:

虽然我们保持以前的终端打开,但我现在打开第 3 个终端,belos 是控制台。

PS D:\checkout\javascript\sample projects\kafka> node .\kafkajs-impl.js
{"level":"ERROR","timestamp":"2019-12-24T03:28:07.516Z","logger":"kafkajs","message":"[Connection] Connection timeout","broker":"192.168.1.172:9092","clientId":"my-app"}
{"level":"ERROR","timestamp":"2019-12-24T03:28:07.528Z","logger":"kafkajs","message":"[BrokerPool] Failed to connect to seed broker, trying another broker from the list: Connection timeout","retryCount":0,"retryTime":273}
{"level":"INFO","timestamp":"2019-12-24T03:28:07.865Z","logger":"kafkajs","message":"[Consumer] Starting","groupId":"test-group"}
{"level":"ERROR","timestamp":"2019-12-24T03:28:08.523Z","logger":"kafkajs","message":"[Connection] Connection error: connect ECONNREFUSED 192.168.1.172:9092","broker":"192.168.1.172:9092","clientId":"my-app","stack":"Error: connect ECONNREFUSED 192.168.1.172:9092\n    at TCPConnectWrap.afterConnect [as oncomplete] (net.js:1191:14)"}
{"level":"INFO","timestamp":"2019-12-24T03:28:11.803Z","logger":"kafkajs","message":"[Runner] Consumer has joined the group","groupId":"test-group","memberId":"my-app-499da929-d351-4e59-94c9-88a18e97999d","leaderId":"my-app-b1fec592-44ed-4ab7-bee3-5ed1850506aa","isLeader":false,"memberAssignment":{"test":[0]},"groupProtocol":"RoundRobinAssigner","duration":3937}

第2终端汇总再平衡信息如下:

{"level":"INFO","timestamp":"2019-12-24T03:22:26.026Z","logger":"kafkajs","message":"[Runner] Consumer has joined the group","groupId":"test-group","memberId":"my-app-5d944889-d00b-48f8-91ea-ab0e0c4b6d13","leaderId":"my-app-b1fec592-44ed-4ab7-bee3-5ed1850506aa","isLeader":false,"memberAssignment":{"test":[0,2]},"groupProtocol":"RoundRobinAssigner","duration":4495}
{"level":"ERROR","timestamp":"2019-12-24T03:28:11.720Z","logger":"kafkajs","message":"[Connection] Response Heartbeat(key: 12, version: 1)","broker":"192.168.1.172:9093","clientId":"my-app","error":"The group is rebalancing, so a rejoin is needed","correlationId":144,"size":10}
{"level":"ERROR","timestamp":"2019-12-24T03:28:11.725Z","logger":"kafkajs","message":"[Runner] The group is rebalancing, re-joining","groupId":"test-group","memberId":"my-app-5d944889-d00b-48f8-91ea-ab0e0c4b6d13","error":"The group is rebalancing, so a rejoin is needed","retryCount":0,"retryTime":270}
{"level":"INFO","timestamp":"2019-12-24T03:28:11.801Z","logger":"kafkajs","message":"[Runner] Consumer has joined the group","groupId":"test-group","memberId":"my-app-5d944889-d00b-48f8-91ea-ab0e0c4b6d13","leaderId":"my-app-b1fec592-44ed-4ab7-bee3-5ed1850506aa","isLeader":false,"memberAssignment":{"test":[1]},"groupProtocol":"RoundRobinAssigner","duration":70}

1号终端汇总再平衡信息如下:

{"level":"ERROR","timestamp":"2019-12-24T03:28:11.750Z","logger":"kafkajs","message":"[Runner] The group is rebalancing, re-joining","groupId":"test-group","memberId":"my-app-b1fec592-44ed-4ab7-bee3-5ed1850506aa","error":"The group is rebalancing, so a rejoin is needed","retryCount":0,"retryTime":337}
{"level":"INFO","timestamp":"2019-12-24T03:28:11.797Z","logger":"kafkajs","message":"[Runner] Consumer has joined the group","groupId":"test-group","memberId":"my-app-b1fec592-44ed-4ab7-bee3-5ed1850506aa","leaderId":"my-app-b1fec592-44ed-4ab7-bee3-5ed1850506aa","isLeader":true,"memberAssignment":{"test":[2]},"groupProtocol":"RoundRobinAssigner","duration":40}

所有消费者达到 运行:

通过轰炸生产者事件,下面是消费者监听各个分区的快照。每个消费者现在都在听一个主题的 1 个特定分区,这很棒,现在可以在不同的机器上使用以实现并行性。

以下是消费者通过 运行ning 命令 ./bin/kafka-consumer-groups.sh --describe --group test-group --bootstrap-server 192.168.1.172:9093

分区映射的状态
emgda@ubuntu:~/softwares/kafka_2.12-2.3.0$ ./bin/kafka-consumer-groups.sh --describe --group test-group  --bootstrap-server 192.168.1.172:9093

GROUP           TOPIC           PARTITION  CURRENT-OFFSET  LOG-END-OFFSET  LAG             CONSUMER-ID                                 HOST            CLIENT-ID
test-group      test            2          126             126             0               my-app-b1fec592-44ed-4ab7-bee3-5ed1850506aa /192.168.1.48   my-app
test-group      test            1          124             124             0               my-app-945d6f38-bcda-4f02-b1a2-325957db5846 /192.168.1.48   my-app
test-group      test            0          124             124             0               my-app-5d944889-d00b-48f8-91ea-ab0e0c4b6d13 /192.168.1.48   my-app

关闭 1 个消费者和 RE_BALANCING 效果:

现在,如果我关闭 1 个消费者,例如在第 3 个终端上,那么下面是一个主题的 3 个分区的 2 个消费者之间发生的重新平衡的快照:

以下是消费者通过 运行ning 命令 ./bin/kafka-consumer-groups.sh --describe --group test-group --bootstrap-server 192.168.1.172:9093

分区映射的状态
emgda@ubuntu:~/softwares/kafka_2.12-2.3.0$ ./bin/kafka-consumer-groups.sh --describe --group test-group  --bootstrap-server 192.168.1.172:9093

GROUP           TOPIC           PARTITION  CURRENT-OFFSET  LOG-END-OFFSET  LAG             CONSUMER-ID                                 HOST            CLIENT-ID
test-group      test            0          123             123             0               my-app-5d944889-d00b-48f8-91ea-ab0e0c4b6d13 /192.168.1.48   my-app
test-group      test            2          125             125             0               my-app-5d944889-d00b-48f8-91ea-ab0e0c4b6d13 /192.168.1.48   my-app
test-group      test            1          123             123             0               my-app-b1fec592-44ed-4ab7-bee3-5ed1850506aa /192.168.1.48   my-app