生产者不能向多个消费者分发消息?
Producer cant distrubute messages to multiple consumers?
我正在尝试使用 docker 自动扩展 java spring 框架 kafka 消费者-使用 --scale
标志和 docker 中的转发端口-compose.yaml 比如“8070-8072:8070”。因此,当我触发用于发布消息的端点时,它运行良好。但在消费者方面,只有 1 个消费者消费了所有消息。我有 3 个具有相同组 ID 和不同客户端 ID 的消费者。我想要的是分布式消息传递。我读了一些关于分区的论文,并查看了我的日志。似乎只有 1 个分区。是这个原因吗?我怎样才能做到这一点?我将添加日志、消费者配置、发布者配置和 docker-compose 文件。首先是 log。似乎只有 3 of 1 有分区。
命令:
docker-compose up --build --scale nonwebapp=3 --scale webapp=3
docker-compose.yaml
kafka:
image: confluentinc/cp-kafka:latest
depends_on:
- zookeeper
ports:
- 9092:9092
environment:
KAFKA_LOG_DIRS: "/kafka/kafka-logs"
KAFKA_BROKER_ID: 1
KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://kafka:29092,PLAINTEXT_HOST://localhost:9092
KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: PLAINTEXT:PLAINTEXT,PLAINTEXT_HOST:PLAINTEXT
KAFKA_INTER_BROKER_LISTENER_NAME: PLAINTEXT
KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 3
KAFKA_AUTO_CREATE_TOPICS_ENABLE: "true"
volumes:
- /var/run/docker.sock:/var/run/docker.sock
restart: always
webapp:
build: benchmark.web
command: bash -c "while !</dev/tcp/kafka/29092; do >&2 echo 'Waiting for kafka to up...' sleep 1; done;"
ports:
- "8070-8072:8070"
volumes:
- ./logs:/logs
- ${GCP_KEY_PATH}:/root/keys/keyfile.json:ro
depends_on:
- kafka
environment:
SPRING_KAFKA_BOOTSTRAPSERVERS: kafka:29092
TZ: "Asia/Istanbul"
GOOGLE_APPLICATION_PROJECT_ID: apt-cubist-282712
GOOGLE_APPLICATION_CREDENTIALS: /root/keys/keyfile.json
restart: always
nonwebapp:
build: benchmark.nonweb
command: bash -c "while !</dev/tcp/kafka/29092; do >&2 echo 'Waiting for kafka to up...' sleep 1; done;"
depends_on:
- kafka
volumes:
- ${GCP_KEY_PATH}:/root/keys/keyfile.json:ro
ports:
- "8060-8062:8060"
environment:
SPRING_KAFKA_BOOTSTRAPSERVERS: kafka:29092
GOOGLE_APPLICATION_PROJECT_ID: apt-cubist-282712
GOOGLE_APPLICATION_CREDENTIALS: /root/keys/keyfile.json
TZ: "Asia/Istanbul"
restart: always
生产者配置
@Bean
ProducerFactory<String, byte[]> producerFactory(){
Map<String, Object> configProps = new HashMap<>();
configProps.put(
ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,
kafkaInContainerAdress);
/*
configProps.put(
ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,
kafkaInLocal);
*/
configProps.put(
ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,
StringSerializer.class
);
configProps.put(
ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,
ByteArraySerializer.class
);
return new DefaultKafkaProducerFactory<>(configProps);
}
@Bean
public KafkaTemplate<String,byte[]> kafkaTemplate(){
return new KafkaTemplate<>(producerFactory());
}
消费者配置
@Bean
public ConsumerFactory<String, byte[]> consumerFactory() {
Map<String, Object> props = new HashMap<>();
props.put(
ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,
kafkaInContainerAdress);
/*
props.put(
ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,
kafkaInLocal);
*/
props.put(ConsumerConfig.PARTITION_ASSIGNMENT_STRATEGY_CONFIG,
"org.apache.kafka.clients.consumer.RoundRobinAssignor");
props.put(ConsumerConfig.CLIENT_ID_CONFIG, r.nextInt());
props.put(
ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,
StringDeserializer.class);
props.put(
ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,
ByteArrayDeserializer.class);
props.put("group.id","topic_trial_consumers");
return new DefaultKafkaConsumerFactory<>(props);
}
@Bean
public ConcurrentKafkaListenerContainerFactory<String, byte[]>
kafkaListenerContainerFactory() {
ConcurrentKafkaListenerContainerFactory<String, byte[]> factory =
new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(consumerFactory());
return factory;
}
It seems there just 1 partition. Is it the reason?
是的,如果只有一个partition,只有一个consumer(来自consumer group)可以消费,其他consumer(来自同一个group)即使启动了也是空闲的。
It seems just 3 of 1 have partition
从你的图片中,我可以看出topic_trial-0
。所以它是 topic_trial
的第一个分区。
增加数量。分区的数量,例如 3,并启动三个具有相同 group.id
的消费者,并且应该分配负载(每个一个分区)。
我正在尝试使用 docker 自动扩展 java spring 框架 kafka 消费者-使用 --scale
标志和 docker 中的转发端口-compose.yaml 比如“8070-8072:8070”。因此,当我触发用于发布消息的端点时,它运行良好。但在消费者方面,只有 1 个消费者消费了所有消息。我有 3 个具有相同组 ID 和不同客户端 ID 的消费者。我想要的是分布式消息传递。我读了一些关于分区的论文,并查看了我的日志。似乎只有 1 个分区。是这个原因吗?我怎样才能做到这一点?我将添加日志、消费者配置、发布者配置和 docker-compose 文件。首先是 log。似乎只有 3 of 1 有分区。
命令:
docker-compose up --build --scale nonwebapp=3 --scale webapp=3
docker-compose.yaml
kafka:
image: confluentinc/cp-kafka:latest
depends_on:
- zookeeper
ports:
- 9092:9092
environment:
KAFKA_LOG_DIRS: "/kafka/kafka-logs"
KAFKA_BROKER_ID: 1
KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://kafka:29092,PLAINTEXT_HOST://localhost:9092
KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: PLAINTEXT:PLAINTEXT,PLAINTEXT_HOST:PLAINTEXT
KAFKA_INTER_BROKER_LISTENER_NAME: PLAINTEXT
KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 3
KAFKA_AUTO_CREATE_TOPICS_ENABLE: "true"
volumes:
- /var/run/docker.sock:/var/run/docker.sock
restart: always
webapp:
build: benchmark.web
command: bash -c "while !</dev/tcp/kafka/29092; do >&2 echo 'Waiting for kafka to up...' sleep 1; done;"
ports:
- "8070-8072:8070"
volumes:
- ./logs:/logs
- ${GCP_KEY_PATH}:/root/keys/keyfile.json:ro
depends_on:
- kafka
environment:
SPRING_KAFKA_BOOTSTRAPSERVERS: kafka:29092
TZ: "Asia/Istanbul"
GOOGLE_APPLICATION_PROJECT_ID: apt-cubist-282712
GOOGLE_APPLICATION_CREDENTIALS: /root/keys/keyfile.json
restart: always
nonwebapp:
build: benchmark.nonweb
command: bash -c "while !</dev/tcp/kafka/29092; do >&2 echo 'Waiting for kafka to up...' sleep 1; done;"
depends_on:
- kafka
volumes:
- ${GCP_KEY_PATH}:/root/keys/keyfile.json:ro
ports:
- "8060-8062:8060"
environment:
SPRING_KAFKA_BOOTSTRAPSERVERS: kafka:29092
GOOGLE_APPLICATION_PROJECT_ID: apt-cubist-282712
GOOGLE_APPLICATION_CREDENTIALS: /root/keys/keyfile.json
TZ: "Asia/Istanbul"
restart: always
生产者配置
@Bean
ProducerFactory<String, byte[]> producerFactory(){
Map<String, Object> configProps = new HashMap<>();
configProps.put(
ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,
kafkaInContainerAdress);
/*
configProps.put(
ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,
kafkaInLocal);
*/
configProps.put(
ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,
StringSerializer.class
);
configProps.put(
ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,
ByteArraySerializer.class
);
return new DefaultKafkaProducerFactory<>(configProps);
}
@Bean
public KafkaTemplate<String,byte[]> kafkaTemplate(){
return new KafkaTemplate<>(producerFactory());
}
消费者配置
@Bean
public ConsumerFactory<String, byte[]> consumerFactory() {
Map<String, Object> props = new HashMap<>();
props.put(
ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,
kafkaInContainerAdress);
/*
props.put(
ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG,
kafkaInLocal);
*/
props.put(ConsumerConfig.PARTITION_ASSIGNMENT_STRATEGY_CONFIG,
"org.apache.kafka.clients.consumer.RoundRobinAssignor");
props.put(ConsumerConfig.CLIENT_ID_CONFIG, r.nextInt());
props.put(
ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,
StringDeserializer.class);
props.put(
ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,
ByteArrayDeserializer.class);
props.put("group.id","topic_trial_consumers");
return new DefaultKafkaConsumerFactory<>(props);
}
@Bean
public ConcurrentKafkaListenerContainerFactory<String, byte[]>
kafkaListenerContainerFactory() {
ConcurrentKafkaListenerContainerFactory<String, byte[]> factory =
new ConcurrentKafkaListenerContainerFactory<>();
factory.setConsumerFactory(consumerFactory());
return factory;
}
It seems there just 1 partition. Is it the reason?
是的,如果只有一个partition,只有一个consumer(来自consumer group)可以消费,其他consumer(来自同一个group)即使启动了也是空闲的。
It seems just 3 of 1 have partition
从你的图片中,我可以看出topic_trial-0
。所以它是 topic_trial
的第一个分区。
增加数量。分区的数量,例如 3,并启动三个具有相同 group.id
的消费者,并且应该分配负载(每个一个分区)。