连接到 Apache Kafka 多节点集群中的 Zookeeper
Connecting to Zookeeper in a Apache Kafka Multi Node cluster
我按照以下说明设置了多节点 kafka 集群。
现在,如何连接到 zookeeper ?在 JAVA 的 Producer/consumer 侧只连接一个动物园管理员是否可以,或者有没有办法连接所有动物园管理员节点?
设置多节点 Apache ZooKeeper 集群
在集群的每个节点上,将以下行添加到文件 kafka/config/zookeeper.properties
server.1=zNode01:2888:3888
server.2=zNode02:2888:3888
server.3=zNode03:2888:3888
#add here more servers if you want
initLimit=5
syncLimit=2
在集群的每个节点上,在 dataDir 属性 表示的文件夹中创建一个名为 myid 的文件(默认文件夹为 /tmp/zookeeper )。 myid 文件应仅包含 znode 的 ID(zNode01 为“1”,ZNode02 为“2”,等等)
设置多代理 Apache Kafka 集群
在集群的每个节点上修改 属性 zookeeper.connect 从文件 kafka/config/server.properties:
zookeeper.connect=zNode01:2181,zNode02:2181,zNode03:2181
在集群的每个节点上修改文件 kafka/config/server.properties 中的 属性 host.name:
host.name=zNode0x
在集群的每个节点上修改文件 kafka/config/server.properties 中的 属性 broker.id(集群中的每个代理都应该有一个唯一的 ID)
你可以传递生产者或消费者中的所有节点。 Kafka 足够智能,它会根据复制因子或分区连接到具有您所需数据的节点
消费者代码如下:
Properties props = new Properties();
props.put("bootstrap.servers", "acbd.com:9092,defg.com:9092");
props.put("group.id", "test");
props.put("enable.auto.commit", "true");
props.put("auto.commit.interval.ms", "1000");
props.put("session.timeout.ms", "30000");
props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
consumer.subscribe(Arrays.asList("foo", "bar"));
while (true) {
ConsumerRecords<String, String> records = consumer.poll(100);
for (ConsumerRecord<String, String> record : records)
System.out.printf("offset = %d, key = %s, value = %s", record.offset(), record.key(), record.value());
}
您可以找到更多信息here
注意:这个方法的问题是它会打开多个连接来找出哪个节点保存数据。对于更健壮和可扩展的系统,您可以维护 分区号和节点名称 的映射,这也有助于负载平衡。
这是生产者样本
Properties props = new Properties();
props.put("bootstrap.servers", "acbd.com:9092,defg.com:9092");
props.put("acks", "all");
props.put("retries", 0);
props.put("batch.size", 16384);
props.put("linger.ms", 1);
props.put("buffer.memory", 33554432);
props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
Producer<String, String> producer = new KafkaProducer<>(props);
for(int i = 0; i < 100; i++)
producer.send(new ProducerRecord<String, String>("my-topic", Integer.toString(i), Integer.toString(i)));
producer.close();
更多信息here
无需在 Kafka 客户端(生产者和消费者)中传递 Zookeeper 连接属性。
从 Kafka-v9 及更高版本开始,Kafka 生产者和消费者不与 Zookeeper 通信。
我按照以下说明设置了多节点 kafka 集群。 现在,如何连接到 zookeeper ?在 JAVA 的 Producer/consumer 侧只连接一个动物园管理员是否可以,或者有没有办法连接所有动物园管理员节点?
设置多节点 Apache ZooKeeper 集群
在集群的每个节点上,将以下行添加到文件 kafka/config/zookeeper.properties
server.1=zNode01:2888:3888
server.2=zNode02:2888:3888
server.3=zNode03:2888:3888
#add here more servers if you want
initLimit=5
syncLimit=2
在集群的每个节点上,在 dataDir 属性 表示的文件夹中创建一个名为 myid 的文件(默认文件夹为 /tmp/zookeeper )。 myid 文件应仅包含 znode 的 ID(zNode01 为“1”,ZNode02 为“2”,等等)
设置多代理 Apache Kafka 集群
在集群的每个节点上修改 属性 zookeeper.connect 从文件 kafka/config/server.properties:
zookeeper.connect=zNode01:2181,zNode02:2181,zNode03:2181
在集群的每个节点上修改文件 kafka/config/server.properties 中的 属性 host.name: host.name=zNode0x
在集群的每个节点上修改文件 kafka/config/server.properties 中的 属性 broker.id(集群中的每个代理都应该有一个唯一的 ID)
你可以传递生产者或消费者中的所有节点。 Kafka 足够智能,它会根据复制因子或分区连接到具有您所需数据的节点
消费者代码如下:
Properties props = new Properties();
props.put("bootstrap.servers", "acbd.com:9092,defg.com:9092");
props.put("group.id", "test");
props.put("enable.auto.commit", "true");
props.put("auto.commit.interval.ms", "1000");
props.put("session.timeout.ms", "30000");
props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
consumer.subscribe(Arrays.asList("foo", "bar"));
while (true) {
ConsumerRecords<String, String> records = consumer.poll(100);
for (ConsumerRecord<String, String> record : records)
System.out.printf("offset = %d, key = %s, value = %s", record.offset(), record.key(), record.value());
}
您可以找到更多信息here
注意:这个方法的问题是它会打开多个连接来找出哪个节点保存数据。对于更健壮和可扩展的系统,您可以维护 分区号和节点名称 的映射,这也有助于负载平衡。
这是生产者样本
Properties props = new Properties();
props.put("bootstrap.servers", "acbd.com:9092,defg.com:9092");
props.put("acks", "all");
props.put("retries", 0);
props.put("batch.size", 16384);
props.put("linger.ms", 1);
props.put("buffer.memory", 33554432);
props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
Producer<String, String> producer = new KafkaProducer<>(props);
for(int i = 0; i < 100; i++)
producer.send(new ProducerRecord<String, String>("my-topic", Integer.toString(i), Integer.toString(i)));
producer.close();
更多信息here
无需在 Kafka 客户端(生产者和消费者)中传递 Zookeeper 连接属性。
从 Kafka-v9 及更高版本开始,Kafka 生产者和消费者不与 Zookeeper 通信。