Spring Cloud Stream 生成不必要的复杂 Kafka 拓扑,为什么?
Spring Cloud Stream generates unnecessary complex Kafka topologies, why?
我有一个 KStream 应用程序,其中包含大量 KStreams、连接和其他操作。我启用了 logging.level.org.springframework.kafka.config=debug
来验证正在生成的 Topology,并发现了很多根本没有意义的节点。
然后我将应用程序简化为:
interface ShippingKStreamProcessor {
@Input("input")
fun input(): KStream<Int, Customer>
}
@Suppress("UNCHECKED_CAST")
@Configuration
class ShippingKStreamConfiguration {
@StreamListener
fun process(@Input("input") input: KStream<Int, Customer> {}
}
奇怪的是,这样一个简单的 KStream 声明生成了这个复杂的拓扑结构:
2019-04-30 23:47:03.881 DEBUG 2944 --- [ main] o.s.k.config.StreamsBuilderFactoryBean : Topologies:
Sub-topology: 0
Source: KSTREAM-SOURCE-0000000000 (topics: [customer])
--> KSTREAM-MAPVALUES-0000000001
Processor: KSTREAM-MAPVALUES-0000000001 (stores: [])
--> KSTREAM-BRANCH-0000000003, KSTREAM-PROCESSOR-0000000002
<-- KSTREAM-SOURCE-0000000000
Processor: KSTREAM-BRANCH-0000000003 (stores: [])
--> KSTREAM-BRANCHCHILD-0000000004, KSTREAM-BRANCHCHILD-0000000005
<-- KSTREAM-MAPVALUES-0000000001
Processor: KSTREAM-BRANCHCHILD-0000000004 (stores: [])
--> KSTREAM-MAPVALUES-0000000007
<-- KSTREAM-BRANCH-0000000003
Processor: KSTREAM-BRANCHCHILD-0000000005 (stores: [])
--> KSTREAM-PROCESSOR-0000000006
<-- KSTREAM-BRANCH-0000000003
Processor: KSTREAM-MAPVALUES-0000000007 (stores: [])
--> none
<-- KSTREAM-BRANCHCHILD-0000000004
Processor: KSTREAM-PROCESSOR-0000000002 (stores: [])
--> none
<-- KSTREAM-MAPVALUES-0000000001
Processor: KSTREAM-PROCESSOR-0000000006 (stores: [])
--> none
<-- KSTREAM-BRANCHCHILD-0000000005
本机 Kafka 应用程序中的相同简单流会产生更符合逻辑的拓扑:
fun main(args: Array<String>) {
val builder = StreamsBuilder()
val streamsConfiguration = Properties()
streamsConfiguration[StreamsConfig.APPLICATION_ID_CONFIG] = "kafka-shipping-service"
streamsConfiguration[StreamsConfig.BOOTSTRAP_SERVERS_CONFIG] = "http://localhost:9092"
streamsConfiguration[AbstractKafkaAvroSerDeConfig.SCHEMA_REGISTRY_URL_CONFIG] = "http://localhost:8081"
val serdeConfig = mapOf(
AbstractKafkaAvroSerDeConfig.SCHEMA_REGISTRY_URL_CONFIG to "http://localhost:8081",
AbstractKafkaAvroSerDeConfig.VALUE_SUBJECT_NAME_STRATEGY to TopicRecordNameStrategy::class.java.name
)
//val byteArraySerde = Serdes.ByteArray()
val intSerde = Serdes.IntegerSerde()
val customerSerde = SpecificAvroSerde<Customer>()
customerSerde.configure(serdeConfig, false)
val customerStream = builder.stream<Int, Customer>("customer",
Consumed.with(intSerde, customerSerde)) as KStream<Int, Customer>
val topology = builder.build()
println(topology.describe())
val streams = KafkaStreams(topology, streamsConfiguration)
streams.start()
}
拓扑:
Topologies:
Sub-topology: 0
Source: KSTREAM-SOURCE-0000000000 (topics: [customer])
--> none
Cloud Stream Spring 生成如此复杂拓扑的原因是什么?
@codependent 在拓扑中有这些额外处理器的原因是因为您正在使用框架提供的 de/serailzers(本机解码和编码默认为 false
)。基本上,我们从 Kafka 主题接收数据作为 byte[]
,然后在内部进行转换。对于这些转换,我们会使用一些额外的处理器,因此您最终会得到更深层次的拓扑结构。
这是 Java 中的基本 StreamListener
(与上面的差不多,但使用更简单的值类型):
@StreamListener
public void process(@Input("input") KStream<Integer, String> input ) {
}
通过活页夹中开箱即用的标准设置,我能够获得与您观察到的相同的更深层次的拓扑结构。但是,当我如下所示修改应用程序的配置时,
spring.cloud.stream.kafka.streams:
binder.configuration:
default.key.serde: org.apache.kafka.common.serialization.Serdes$IntegerSerde
default.value.serde: org.apache.kafka.common.serialization.Serdes$StringSerde
spring.cloud.stream.bindings.input.consumer.useNativeDecoding: true
我的拓扑缩减如下:
2019-05-01 18:02:12.705 DEBUG 67539 --- [ main] o.s.k.config.StreamsBuilderFactoryBean : Topologies:
Sub-topology: 0
Source: KSTREAM-SOURCE-0000000000 (topics: [hello-1])
--> KSTREAM-MAPVALUES-0000000001
Processor: KSTREAM-MAPVALUES-0000000001 (stores: [])
--> none
<-- KSTREAM-SOURCE-0000000000
这仍然与您从普通 Kafka Streams 应用程序获得的拓扑不同,但事实证明我们可以在活页夹中改进以避免这种情况。简而言之,通过切换到 Kafka Streams 提供的本机解码和编码,您可以避免绑定器构建的所有这些额外级别的拓扑。
在某些情况下,您别无选择,只能依赖 Spring Cloud Stream 提供的反序列化,例如,您从基于 Spring 的生产者接收数据Cloud Stream 使用了一些特殊的序列化器。我认为您的情况是这样,因为据我所知,您的制作人基于 Spring Cloud Stream 并且使用框架提供的 Avro 序列化程序。在这种情况下,在您的处理器中使用 Kafka Stream 的 Avro Serde
将无法工作,因为这些序列化器不兼容。所以这是您的一些选择。
方法 #1:
- 让您的生产者使用 Kafka 提供的本机序列化程序。
- 然后在您的 Kafka Streams 应用程序中使用使用相同 serializer/deserializer 的 Serde。
方法 #2:
- 使用 SCSt 提供的消息序列化程序。
- 然后使用 Kafka Streams 绑定程序提供的默认 de/serialization,这是默认设置。
#2 的缺点显然是您在上面提到的,即更深层次的拓扑结构。根据您的用例和吞吐量,这可能没问题。如果这成为一个真正的性能问题,我们可以尝试在框架完成转换时简化这个过程。
综上所述,我在 Kafka 活页夹中创建了一个 issue,以便在活页夹的下一个版本中进行更改。欢迎您的反馈、建议和 up/down 投票。
我有一个 KStream 应用程序,其中包含大量 KStreams、连接和其他操作。我启用了 logging.level.org.springframework.kafka.config=debug
来验证正在生成的 Topology,并发现了很多根本没有意义的节点。
然后我将应用程序简化为:
interface ShippingKStreamProcessor {
@Input("input")
fun input(): KStream<Int, Customer>
}
@Suppress("UNCHECKED_CAST")
@Configuration
class ShippingKStreamConfiguration {
@StreamListener
fun process(@Input("input") input: KStream<Int, Customer> {}
}
奇怪的是,这样一个简单的 KStream 声明生成了这个复杂的拓扑结构:
2019-04-30 23:47:03.881 DEBUG 2944 --- [ main] o.s.k.config.StreamsBuilderFactoryBean : Topologies:
Sub-topology: 0
Source: KSTREAM-SOURCE-0000000000 (topics: [customer])
--> KSTREAM-MAPVALUES-0000000001
Processor: KSTREAM-MAPVALUES-0000000001 (stores: [])
--> KSTREAM-BRANCH-0000000003, KSTREAM-PROCESSOR-0000000002
<-- KSTREAM-SOURCE-0000000000
Processor: KSTREAM-BRANCH-0000000003 (stores: [])
--> KSTREAM-BRANCHCHILD-0000000004, KSTREAM-BRANCHCHILD-0000000005
<-- KSTREAM-MAPVALUES-0000000001
Processor: KSTREAM-BRANCHCHILD-0000000004 (stores: [])
--> KSTREAM-MAPVALUES-0000000007
<-- KSTREAM-BRANCH-0000000003
Processor: KSTREAM-BRANCHCHILD-0000000005 (stores: [])
--> KSTREAM-PROCESSOR-0000000006
<-- KSTREAM-BRANCH-0000000003
Processor: KSTREAM-MAPVALUES-0000000007 (stores: [])
--> none
<-- KSTREAM-BRANCHCHILD-0000000004
Processor: KSTREAM-PROCESSOR-0000000002 (stores: [])
--> none
<-- KSTREAM-MAPVALUES-0000000001
Processor: KSTREAM-PROCESSOR-0000000006 (stores: [])
--> none
<-- KSTREAM-BRANCHCHILD-0000000005
本机 Kafka 应用程序中的相同简单流会产生更符合逻辑的拓扑:
fun main(args: Array<String>) {
val builder = StreamsBuilder()
val streamsConfiguration = Properties()
streamsConfiguration[StreamsConfig.APPLICATION_ID_CONFIG] = "kafka-shipping-service"
streamsConfiguration[StreamsConfig.BOOTSTRAP_SERVERS_CONFIG] = "http://localhost:9092"
streamsConfiguration[AbstractKafkaAvroSerDeConfig.SCHEMA_REGISTRY_URL_CONFIG] = "http://localhost:8081"
val serdeConfig = mapOf(
AbstractKafkaAvroSerDeConfig.SCHEMA_REGISTRY_URL_CONFIG to "http://localhost:8081",
AbstractKafkaAvroSerDeConfig.VALUE_SUBJECT_NAME_STRATEGY to TopicRecordNameStrategy::class.java.name
)
//val byteArraySerde = Serdes.ByteArray()
val intSerde = Serdes.IntegerSerde()
val customerSerde = SpecificAvroSerde<Customer>()
customerSerde.configure(serdeConfig, false)
val customerStream = builder.stream<Int, Customer>("customer",
Consumed.with(intSerde, customerSerde)) as KStream<Int, Customer>
val topology = builder.build()
println(topology.describe())
val streams = KafkaStreams(topology, streamsConfiguration)
streams.start()
}
拓扑:
Topologies:
Sub-topology: 0
Source: KSTREAM-SOURCE-0000000000 (topics: [customer])
--> none
Cloud Stream Spring 生成如此复杂拓扑的原因是什么?
@codependent 在拓扑中有这些额外处理器的原因是因为您正在使用框架提供的 de/serailzers(本机解码和编码默认为 false
)。基本上,我们从 Kafka 主题接收数据作为 byte[]
,然后在内部进行转换。对于这些转换,我们会使用一些额外的处理器,因此您最终会得到更深层次的拓扑结构。
这是 Java 中的基本 StreamListener
(与上面的差不多,但使用更简单的值类型):
@StreamListener
public void process(@Input("input") KStream<Integer, String> input ) {
}
通过活页夹中开箱即用的标准设置,我能够获得与您观察到的相同的更深层次的拓扑结构。但是,当我如下所示修改应用程序的配置时,
spring.cloud.stream.kafka.streams:
binder.configuration:
default.key.serde: org.apache.kafka.common.serialization.Serdes$IntegerSerde
default.value.serde: org.apache.kafka.common.serialization.Serdes$StringSerde
spring.cloud.stream.bindings.input.consumer.useNativeDecoding: true
我的拓扑缩减如下:
2019-05-01 18:02:12.705 DEBUG 67539 --- [ main] o.s.k.config.StreamsBuilderFactoryBean : Topologies:
Sub-topology: 0
Source: KSTREAM-SOURCE-0000000000 (topics: [hello-1])
--> KSTREAM-MAPVALUES-0000000001
Processor: KSTREAM-MAPVALUES-0000000001 (stores: [])
--> none
<-- KSTREAM-SOURCE-0000000000
这仍然与您从普通 Kafka Streams 应用程序获得的拓扑不同,但事实证明我们可以在活页夹中改进以避免这种情况。简而言之,通过切换到 Kafka Streams 提供的本机解码和编码,您可以避免绑定器构建的所有这些额外级别的拓扑。
在某些情况下,您别无选择,只能依赖 Spring Cloud Stream 提供的反序列化,例如,您从基于 Spring 的生产者接收数据Cloud Stream 使用了一些特殊的序列化器。我认为您的情况是这样,因为据我所知,您的制作人基于 Spring Cloud Stream 并且使用框架提供的 Avro 序列化程序。在这种情况下,在您的处理器中使用 Kafka Stream 的 Avro Serde
将无法工作,因为这些序列化器不兼容。所以这是您的一些选择。
方法 #1:
- 让您的生产者使用 Kafka 提供的本机序列化程序。
- 然后在您的 Kafka Streams 应用程序中使用使用相同 serializer/deserializer 的 Serde。
方法 #2:
- 使用 SCSt 提供的消息序列化程序。
- 然后使用 Kafka Streams 绑定程序提供的默认 de/serialization,这是默认设置。
#2 的缺点显然是您在上面提到的,即更深层次的拓扑结构。根据您的用例和吞吐量,这可能没问题。如果这成为一个真正的性能问题,我们可以尝试在框架完成转换时简化这个过程。
综上所述,我在 Kafka 活页夹中创建了一个 issue,以便在活页夹的下一个版本中进行更改。欢迎您的反馈、建议和 up/down 投票。