如何测试使用 Avro 和 Confluent Schema Registry 的 Spring Cloud Stream Kafka Streams 应用程序?
How can I test a Spring Cloud Stream Kafka Streams application that uses Avro and the Confluent Schema Registry?
我无法弄清楚如何测试使用 Avro 作为消息格式和 (Confluent) 模式注册表的 Spring Cloud Stream Kafka Streams 应用程序。
配置可能是这样的:
spring:
application:
name: shipping-service
cloud:
stream:
schema-registry-client:
endpoint: http://localhost:8081
kafka:
streams:
binder:
configuration:
application:
id: shipping-service
default:
key:
serde: org.apache.kafka.common.serialization.Serdes$IntegerSerde
schema:
registry:
url: ${spring.cloud.stream.schema-registry-client.endpoint}
value:
subject:
name:
strategy: io.confluent.kafka.serializers.subject.RecordNameStrategy
bindings:
input:
consumer:
valueSerde: io.confluent.kafka.streams.serdes.avro.SpecificAvroSerde
order:
consumer:
valueSerde: io.confluent.kafka.streams.serdes.avro.GenericAvroSerde
output:
producer:
valueSerde: io.confluent.kafka.streams.serdes.avro.SpecificAvroSerde
bindings:
input:
destination: customer
order:
destination: order
output:
destination: order
server:
port: 8086
logging:
level:
org.springframework.kafka.config: debug
注释:
- 正在使用原生 serialization/deserialization。
- 测试框架:Junit 5
我想关于 Kafka Broker,我应该使用 EmbeddedKafkaBroker bean,但如您所见,它还依赖于应该以某种方式模拟的架构注册表。怎么样?
解决这个问题真的很痛苦,但最后我设法使用 fluent-kafka-streams-tests:
让它工作
额外的依赖项:
testImplementation("org.springframework.kafka:spring-kafka-test")
testImplementation("com.bakdata.fluent-kafka-streams-tests:schema-registry-mock-junit5:2.0.0")
关键是将必要的配置设置为系统属性。为此,我创建了一个单独的测试配置 class:
@Configuration
class KafkaTestConfiguration(private val embeddedKafkaBroker: EmbeddedKafkaBroker) {
private val schemaRegistryMock = SchemaRegistryMock()
@PostConstruct
fun init() {
System.setProperty("spring.kafka.bootstrap-servers", embeddedKafkaBroker.brokersAsString)
System.setProperty("spring.cloud.stream.kafka.streams.binder.brokers", embeddedKafkaBroker.brokersAsString)
schemaRegistryMock.start()
System.setProperty("spring.cloud.stream.schema-registry-client.endpoint", schemaRegistryMock.url)
System.setProperty("spring.cloud.stream.kafka.streams.binder.configuration.schema.registry.url", schemaRegistryMock.url)
}
@Bean
fun schemaRegistryMock(): SchemaRegistryMock {
return schemaRegistryMock
}
@PreDestroy
fun preDestroy() {
schemaRegistryMock.stop()
}
}
最后是测试 class,您现在可以在其中使用 KStream 处理它们并利用模拟模式注册表来生成和使用 Avro 消息:
@EmbeddedKafka
@SpringBootTest(properties = [
"spring.profiles.active=local",
"schema-registry.user=",
"schema-registry.password=",
"spring.cloud.stream.bindings.event.destination=event",
"spring.cloud.stream.bindings.event.producer.useNativeEncoding=true",
"spring.cloud.stream.kafka.streams.binder.configuration.application.server=localhost:8080",
"spring.cloud.stream.kafka.streams.bindings.event.consumer.keySerde=io.confluent.kafka.streams.serdes.avro.SpecificAvroSerde",
"spring.cloud.stream.kafka.streams.bindings.event.consumer.valueSerde=io.confluent.kafka.streams.serdes.avro.SpecificAvroSerde"])
class MyApplicationTests {
@Autowired
private lateinit var embeddedKafka: EmbeddedKafkaBroker
@Autowired
private lateinit var schemaRegistryMock: SchemaRegistryMock
@Test
fun `should process events`() {
val senderProps = KafkaTestUtils.producerProps(embeddedKafka)
senderProps[ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG] = "io.confluent.kafka.serializers.KafkaAvroSerializer"
senderProps[ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG] = "io.confluent.kafka.serializers.KafkaAvroSerializer"
senderProps["schema.registry.url"] = schemaRegistryMock.url
val pf = DefaultKafkaProducerFactory<Int, String>(senderProps)
try {
val template = KafkaTemplate(pf, true)
template.defaultTopic = "event"
...
}
我无法弄清楚如何测试使用 Avro 作为消息格式和 (Confluent) 模式注册表的 Spring Cloud Stream Kafka Streams 应用程序。
配置可能是这样的:
spring:
application:
name: shipping-service
cloud:
stream:
schema-registry-client:
endpoint: http://localhost:8081
kafka:
streams:
binder:
configuration:
application:
id: shipping-service
default:
key:
serde: org.apache.kafka.common.serialization.Serdes$IntegerSerde
schema:
registry:
url: ${spring.cloud.stream.schema-registry-client.endpoint}
value:
subject:
name:
strategy: io.confluent.kafka.serializers.subject.RecordNameStrategy
bindings:
input:
consumer:
valueSerde: io.confluent.kafka.streams.serdes.avro.SpecificAvroSerde
order:
consumer:
valueSerde: io.confluent.kafka.streams.serdes.avro.GenericAvroSerde
output:
producer:
valueSerde: io.confluent.kafka.streams.serdes.avro.SpecificAvroSerde
bindings:
input:
destination: customer
order:
destination: order
output:
destination: order
server:
port: 8086
logging:
level:
org.springframework.kafka.config: debug
注释:
- 正在使用原生 serialization/deserialization。
- 测试框架:Junit 5
我想关于 Kafka Broker,我应该使用 EmbeddedKafkaBroker bean,但如您所见,它还依赖于应该以某种方式模拟的架构注册表。怎么样?
解决这个问题真的很痛苦,但最后我设法使用 fluent-kafka-streams-tests:
让它工作额外的依赖项:
testImplementation("org.springframework.kafka:spring-kafka-test")
testImplementation("com.bakdata.fluent-kafka-streams-tests:schema-registry-mock-junit5:2.0.0")
关键是将必要的配置设置为系统属性。为此,我创建了一个单独的测试配置 class:
@Configuration
class KafkaTestConfiguration(private val embeddedKafkaBroker: EmbeddedKafkaBroker) {
private val schemaRegistryMock = SchemaRegistryMock()
@PostConstruct
fun init() {
System.setProperty("spring.kafka.bootstrap-servers", embeddedKafkaBroker.brokersAsString)
System.setProperty("spring.cloud.stream.kafka.streams.binder.brokers", embeddedKafkaBroker.brokersAsString)
schemaRegistryMock.start()
System.setProperty("spring.cloud.stream.schema-registry-client.endpoint", schemaRegistryMock.url)
System.setProperty("spring.cloud.stream.kafka.streams.binder.configuration.schema.registry.url", schemaRegistryMock.url)
}
@Bean
fun schemaRegistryMock(): SchemaRegistryMock {
return schemaRegistryMock
}
@PreDestroy
fun preDestroy() {
schemaRegistryMock.stop()
}
}
最后是测试 class,您现在可以在其中使用 KStream 处理它们并利用模拟模式注册表来生成和使用 Avro 消息:
@EmbeddedKafka
@SpringBootTest(properties = [
"spring.profiles.active=local",
"schema-registry.user=",
"schema-registry.password=",
"spring.cloud.stream.bindings.event.destination=event",
"spring.cloud.stream.bindings.event.producer.useNativeEncoding=true",
"spring.cloud.stream.kafka.streams.binder.configuration.application.server=localhost:8080",
"spring.cloud.stream.kafka.streams.bindings.event.consumer.keySerde=io.confluent.kafka.streams.serdes.avro.SpecificAvroSerde",
"spring.cloud.stream.kafka.streams.bindings.event.consumer.valueSerde=io.confluent.kafka.streams.serdes.avro.SpecificAvroSerde"])
class MyApplicationTests {
@Autowired
private lateinit var embeddedKafka: EmbeddedKafkaBroker
@Autowired
private lateinit var schemaRegistryMock: SchemaRegistryMock
@Test
fun `should process events`() {
val senderProps = KafkaTestUtils.producerProps(embeddedKafka)
senderProps[ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG] = "io.confluent.kafka.serializers.KafkaAvroSerializer"
senderProps[ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG] = "io.confluent.kafka.serializers.KafkaAvroSerializer"
senderProps["schema.registry.url"] = schemaRegistryMock.url
val pf = DefaultKafkaProducerFactory<Int, String>(senderProps)
try {
val template = KafkaTemplate(pf, true)
template.defaultTopic = "event"
...
}