使用在 Spring Kafka 上选择的分区创建主题时出错
Error when creating a topic with partition choosed on Spring Kafka
我正在学习在 Kotlin 中使用 Spring Kafka 使用 Kafka。我了解到,当发布新主题时,如果不存在则创建它。因此,当我向从 Spring 创建的 new/old 主题发送一个值时,默认分区为 0,但我想在另一个分区上写一条消息,例如分区 1。
当我 create/write 讨论一个主题时,它是有效的:
val topicTesteKotlin = "topico-teste-kotlin"
fun sendTopicCallback(@PathVariable message : String) : ResponseEntity<String> {
val msg = Optional.of(message)
return if (msg.isPresent) {
kafkaTemplate.send(topicTesteKotlin, message).addCallback({
println("Sent message=[" + message +
"] with offset=[" + it!!.recordMetadata.offset() + "]")
}, {
println("Unable to send message=["
+ message + "] due to : " + it.message)
})
ResponseEntity.ok(msg.get())
} else {
kafkaTemplate.send(topicTesteKotlin, "GET /send_topic_callback/message BadRequest > $message")
ResponseEntity.badRequest().body("Bad request!")
}
}
但是,当我使用以下方法选择分区和密钥时:
val topicTesteKotlin = "topico-teste-kotlin"
fun sendTopicCallback(@PathVariable message : String) : ResponseEntity<String> {
val msg = Optional.of(message)
return if (msg.isPresent) {
kafkaTemplate.send(topicTesteKotlin, 1, "1", message).addCallback({
println("Sent message=[" + message +
"] with offset=[" + it!!.recordMetadata.offset() + "]")
}, {
println("Unable to send message=["
+ message + "] due to : " + it.message)
})
ResponseEntity.ok(msg.get())
} else {
kafkaTemplate.send(topicTesteKotlin, "GET /send_topic_callback/message BadRequest > $message")
ResponseEntity.badRequest().body("Bad request!")
}
}
我收到以下错误:
org.apache.kafka.common.KafkaException: Invalid partition given with record: 1 is not in the range [0...1).
我尝试将密钥更改为 0.1
,但还是不行。显然,当我从 Spring 客户端创建主题时,只会创建一个分区,即 0
。
Kafka 生产者配置
@Configuration
class KafkaProducerConfig {
@Bean
fun producerFactory() : ProducerFactory<String, String> {
val configProps = HashMap<String,Any>()
configProps[ProducerConfig.BOOTSTRAP_SERVERS_CONFIG] = "localhost:9092"
configProps[ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG] = StringSerializer::class.java
configProps[ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG] = StringSerializer::class.java
return DefaultKafkaProducerFactory(configProps)
}
@Bean
fun kafkaTemplate() : KafkaTemplate<String, String> {
return KafkaTemplate(producerFactory())
}
}
那么,我如何从 Spring Kafka 客户端创建分区?
您可以使用以下代码管理主题创建机制:
@Configuration
public class KafkaTopicConfig {
@Value(value = "${kafka.bootstrapAddress}")
private String bootstrapAddress;
private String testTopicName = "topico-teste-kotlin";
@Bean
public KafkaAdmin kafkaAdmin() {
Map<String, Object> configs = new HashMap<>();
configs.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapAddress);
return new KafkaAdmin(configs);
}
@Bean
public NewTopic testTopic() {
// second parameter is a number of partitions
return new NewTopic(testTopicName, 2, (short) 1);
}
}
我正在学习在 Kotlin 中使用 Spring Kafka 使用 Kafka。我了解到,当发布新主题时,如果不存在则创建它。因此,当我向从 Spring 创建的 new/old 主题发送一个值时,默认分区为 0,但我想在另一个分区上写一条消息,例如分区 1。
当我 create/write 讨论一个主题时,它是有效的:
val topicTesteKotlin = "topico-teste-kotlin"
fun sendTopicCallback(@PathVariable message : String) : ResponseEntity<String> {
val msg = Optional.of(message)
return if (msg.isPresent) {
kafkaTemplate.send(topicTesteKotlin, message).addCallback({
println("Sent message=[" + message +
"] with offset=[" + it!!.recordMetadata.offset() + "]")
}, {
println("Unable to send message=["
+ message + "] due to : " + it.message)
})
ResponseEntity.ok(msg.get())
} else {
kafkaTemplate.send(topicTesteKotlin, "GET /send_topic_callback/message BadRequest > $message")
ResponseEntity.badRequest().body("Bad request!")
}
}
但是,当我使用以下方法选择分区和密钥时:
val topicTesteKotlin = "topico-teste-kotlin"
fun sendTopicCallback(@PathVariable message : String) : ResponseEntity<String> {
val msg = Optional.of(message)
return if (msg.isPresent) {
kafkaTemplate.send(topicTesteKotlin, 1, "1", message).addCallback({
println("Sent message=[" + message +
"] with offset=[" + it!!.recordMetadata.offset() + "]")
}, {
println("Unable to send message=["
+ message + "] due to : " + it.message)
})
ResponseEntity.ok(msg.get())
} else {
kafkaTemplate.send(topicTesteKotlin, "GET /send_topic_callback/message BadRequest > $message")
ResponseEntity.badRequest().body("Bad request!")
}
}
我收到以下错误:
org.apache.kafka.common.KafkaException: Invalid partition given with record: 1 is not in the range [0...1).
我尝试将密钥更改为 0.1
,但还是不行。显然,当我从 Spring 客户端创建主题时,只会创建一个分区,即 0
。
Kafka 生产者配置
@Configuration
class KafkaProducerConfig {
@Bean
fun producerFactory() : ProducerFactory<String, String> {
val configProps = HashMap<String,Any>()
configProps[ProducerConfig.BOOTSTRAP_SERVERS_CONFIG] = "localhost:9092"
configProps[ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG] = StringSerializer::class.java
configProps[ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG] = StringSerializer::class.java
return DefaultKafkaProducerFactory(configProps)
}
@Bean
fun kafkaTemplate() : KafkaTemplate<String, String> {
return KafkaTemplate(producerFactory())
}
}
那么,我如何从 Spring Kafka 客户端创建分区?
您可以使用以下代码管理主题创建机制:
@Configuration
public class KafkaTopicConfig {
@Value(value = "${kafka.bootstrapAddress}")
private String bootstrapAddress;
private String testTopicName = "topico-teste-kotlin";
@Bean
public KafkaAdmin kafkaAdmin() {
Map<String, Object> configs = new HashMap<>();
configs.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapAddress);
return new KafkaAdmin(configs);
}
@Bean
public NewTopic testTopic() {
// second parameter is a number of partitions
return new NewTopic(testTopicName, 2, (short) 1);
}
}