Spring Cloud Stream 生成不必要的复杂 Kafka 拓扑,为什么?

Spring Cloud Stream generates unnecessary complex Kafka topologies, why?

我有一个 KStream 应用程序,其中包含大量 KStreams、连接和其他操作。我启用了 logging.level.org.springframework.kafka.config=debug 来验证正在生成的 Topology,并发现了很多根本没有意义的节点。

然后我将应用程序简化为:

interface ShippingKStreamProcessor {

    @Input("input")
    fun input(): KStream<Int, Customer>

}

@Suppress("UNCHECKED_CAST")
@Configuration
class ShippingKStreamConfiguration {

    @StreamListener
    fun process(@Input("input") input: KStream<Int, Customer> {}

}

奇怪的是,这样一个简单的 KStream 声明生成了这个复杂的拓扑结构:

2019-04-30 23:47:03.881 DEBUG 2944 --- [           main] o.s.k.config.StreamsBuilderFactoryBean   : Topologies:
   Sub-topology: 0
    Source: KSTREAM-SOURCE-0000000000 (topics: [customer])
      --> KSTREAM-MAPVALUES-0000000001
    Processor: KSTREAM-MAPVALUES-0000000001 (stores: [])
      --> KSTREAM-BRANCH-0000000003, KSTREAM-PROCESSOR-0000000002
      <-- KSTREAM-SOURCE-0000000000
    Processor: KSTREAM-BRANCH-0000000003 (stores: [])
      --> KSTREAM-BRANCHCHILD-0000000004, KSTREAM-BRANCHCHILD-0000000005
      <-- KSTREAM-MAPVALUES-0000000001
    Processor: KSTREAM-BRANCHCHILD-0000000004 (stores: [])
      --> KSTREAM-MAPVALUES-0000000007
      <-- KSTREAM-BRANCH-0000000003
    Processor: KSTREAM-BRANCHCHILD-0000000005 (stores: [])
      --> KSTREAM-PROCESSOR-0000000006
      <-- KSTREAM-BRANCH-0000000003
    Processor: KSTREAM-MAPVALUES-0000000007 (stores: [])
      --> none
      <-- KSTREAM-BRANCHCHILD-0000000004
    Processor: KSTREAM-PROCESSOR-0000000002 (stores: [])
      --> none
      <-- KSTREAM-MAPVALUES-0000000001
    Processor: KSTREAM-PROCESSOR-0000000006 (stores: [])
      --> none
      <-- KSTREAM-BRANCHCHILD-0000000005

本机 Kafka 应用程序中的相同简单流会产生更符合逻辑的拓扑:

fun main(args: Array<String>) {

    val builder = StreamsBuilder()

    val streamsConfiguration = Properties()
    streamsConfiguration[StreamsConfig.APPLICATION_ID_CONFIG] = "kafka-shipping-service"
    streamsConfiguration[StreamsConfig.BOOTSTRAP_SERVERS_CONFIG] = "http://localhost:9092"
    streamsConfiguration[AbstractKafkaAvroSerDeConfig.SCHEMA_REGISTRY_URL_CONFIG] = "http://localhost:8081"

    val serdeConfig = mapOf(
        AbstractKafkaAvroSerDeConfig.SCHEMA_REGISTRY_URL_CONFIG to "http://localhost:8081",
        AbstractKafkaAvroSerDeConfig.VALUE_SUBJECT_NAME_STRATEGY to TopicRecordNameStrategy::class.java.name
    )

    //val byteArraySerde = Serdes.ByteArray()
    val intSerde = Serdes.IntegerSerde()
    val customerSerde = SpecificAvroSerde<Customer>()
    customerSerde.configure(serdeConfig, false)

    val customerStream = builder.stream<Int, Customer>("customer",
        Consumed.with(intSerde, customerSerde)) as KStream<Int, Customer>

    val topology = builder.build()
    println(topology.describe())

    val streams = KafkaStreams(topology, streamsConfiguration)
    streams.start()
}

拓扑:

Topologies:
   Sub-topology: 0
    Source: KSTREAM-SOURCE-0000000000 (topics: [customer])
      --> none

Cloud Stream Spring 生成如此复杂拓扑的原因是什么?

@codependent 在拓扑中有这些额外处理器的原因是因为您正在使用框架提供的 de/serailzers(本机解码和编码默认为 false)。基本上,我们从 Kafka 主题接收数据作为 byte[],然后在内部进行转换。对于这些转换,我们会使用一些额外的处理器,因此您最终会得到更深层次的拓扑结构。

这是 Java 中的基本 StreamListener(与上面的差不多,但使用更简单的值类型):

@StreamListener
public void process(@Input("input") KStream<Integer, String> input ) {

}

通过活页夹中开箱即用的标准设置,我能够获得与您观察到的相同的更深层次的拓扑结构。但是,当我如下所示修改应用程序的配置时,

spring.cloud.stream.kafka.streams:
  binder.configuration:
    default.key.serde: org.apache.kafka.common.serialization.Serdes$IntegerSerde
    default.value.serde: org.apache.kafka.common.serialization.Serdes$StringSerde
spring.cloud.stream.bindings.input.consumer.useNativeDecoding: true

我的拓扑缩减如下:

2019-05-01 18:02:12.705 DEBUG 67539 --- [           main] o.s.k.config.StreamsBuilderFactoryBean   : Topologies:
   Sub-topology: 0
    Source: KSTREAM-SOURCE-0000000000 (topics: [hello-1])
      --> KSTREAM-MAPVALUES-0000000001
    Processor: KSTREAM-MAPVALUES-0000000001 (stores: [])
      --> none
      <-- KSTREAM-SOURCE-0000000000

这仍然与您从普通 Kafka Streams 应用程序获得的拓扑不同,但事实证明我们可以在活页夹中改进以避免这种情况。简而言之,通过切换到 Kafka Streams 提供的本机解码和编码,您可以避免绑定器构建的所有这些额外级别的拓扑。

在某些情况下,您别无选择,只能依赖 Spring Cloud Stream 提供的反序列化,例如,您从基于 Spring 的生产者接收数据Cloud Stream 使用了一些特殊的序列化器。我认为您的情况是这样,因为据我所知,您的制作人基于 Spring Cloud Stream 并且使用框架提供的 Avro 序列化程序。在这种情况下,在您的处理器中使用 Kafka Stream 的 Avro Serde 将无法工作,因为这些序列化器不兼容。所以这是您的一些选择。

方法 #1:

  1. 让您的生产者使用 Kafka 提供的本机序列化程序。
  2. 然后在您的 Kafka Streams 应用程序中使用使用相同 serializer/deserializer 的 Serde。

方法 #2:

  1. 使用 SCSt 提供的消息序列化程序。
  2. 然后使用 Kafka Streams 绑定程序提供的默认 de/serialization,这是默认设置。

#2 的缺点显然是您在上面提到的,即更深层次的拓扑结构。根据您的用例和吞吐量,这可能没问题。如果这成为一个真正的性能问题,我们可以尝试在框架完成转换时简化这个过程。

综上所述,我在 Kafka 活页夹中创建了一个 issue,以便在活页夹的下一个版本中进行更改。欢迎您的反馈、建议和 up/down 投票。