Kafka Streams Topology 不同的键但相同的模式

Kafka Streams Topology different keys but same schema

我有一个 Kafka Streams 拓扑,我在其中加入了 5 个表,每个表都是在一个主题上创建的,该主题由一些 Kafka 连接器填充,这些 Kafka 连接器生成 KeyValue 事件,其中 Key 是根据相同的 Avro 模式生成的,但在我的拓扑,当我加入表时,似乎键不相同,事件如果它们 Java 相等。这一切背后的原因是什么?

它与 Confluent Schema Registry 集成。

我们使用了调试器,我们在调试时看到两个在不同主题上收到但具有相同值的键是相等的。但与此同时,如果使用在主题 A 上收到的密钥在主题 B 之上构建的商店中执行查找,它将不会匹配某些内容。

fun streamsBuilder(): StreamsBuilder {
    val streamsBuilder = StreamsBuilder()
    val productsStream = streamsBuilder.stream<Key, Aggregate>(streamNameRepository.inputWebshopProductsTopic)
    val productPricesStream = streamsBuilder.stream<Key, PriceVariantsHolder>(streamNameRepository.productsPricesStreamTopic)
    val productsRatingsStream = streamsBuilder.stream<Key, Aggregate>(streamNameRepository.inputProductRatingsTopic)
    val inputProductsStockStream = streamsBuilder.stream<Key, Aggregate>(streamNameRepository.inputProductsStockTopic)

    val productsStockStream =
            inputProductsStockStream.map { key, value -> toKeyValue(key, productStockMapper.aStockQuantity(value)) }
    productsStockStream.to(streamNameRepository.productsStockStreamTopic)

    streamsBuilder.globalTable<Key, StockQuantity>(streamNameRepository.productsStockStreamTopic,
            Materialized.`as`(streamNameRepository.productsStockGlobalStoreTopic))

    val saleProductsTable = productsStream
            .filter { _, aggregate -> aggregate.payload != null }
            .map { key, aggregate -> toKeyValue(key, saleProductMapper.aSaleProduct(aggregate) { productsStockStore().get(Key(it)) }) }
            .mapValues { saleProduct -> log.debug("received $saleProduct"); saleProduct; }
            .groupByKey()
            .reduce({ _, saleProductAvro -> saleProductAvro }, Materialized.`as`(streamNameRepository.saleProductsStoreTopic))

    val productPricesTable = productPricesStream
            .map { key, aggregate -> toKeyValue(key, aggregate) }
            .groupByKey()
            .reduce({ _, price -> price }, Materialized.`as`(streamNameRepository.productsPricesStoreTopic))

    val productsRatingsTable = productsRatingsStream
            .map { key, aggregate -> toKeyValue(key, productRatingMapper.aProductRating(aggregate)) }
            .groupByKey()
            .reduce({ _, aggregate -> aggregate }, Materialized.`as`(streamNameRepository.productsRatingsStoreTopic))

    val productsStockTable = productsStockStream
            .map { key, aggregate -> toKeyValue(key, aggregate) }
            .groupByKey()
            .reduce { _, aggregate -> aggregate }

    val productsInNeedOfVariantStockUpdate = productsInNeedOfVariantStockUpdate(productsStockTable, saleProductsTable)

    saleProductsTable
            .outerJoin(productPricesTable, saleProductMapper::aPricedSaleProduct)
            .outerJoin(productsRatingsTable, saleProductMapper::aRatedSaleProduct)
            .outerJoin(productsStockTable, saleProductMapper::aQuantifiedSaleProduct)
            .outerJoin(productsInNeedOfVariantStockUpdate, saleProductMapper::aSaleProductWithUpdatedVariantStock)
            .toStream()
            .filter { _, saleProductAvro -> saleProductAvro.id != null }
            .mapValues { value -> log.debug("publish {}", value); value; }
            .to(streamNameRepository.outputSaleProductsTopic)

    return streamsBuilder
}

private fun <V> toKeyValue(key: Key, value: V): KeyValue<Key, V> {
    return KeyValue(Key.newBuilder(key).build(), value)
}

如果您与 Confluent Schema Registry 集成,每个主题的魔法字节将不同,因此连接将无法按预期工作(因为键比较发生在字节级别...)

有点意料之中。这个问题偶尔会出现一次,并且很难在 Kafka Streams 中本地(即内置)解决,因为 Confluent Schema Registry 是第三方工具,而 Kafka Streams 应该与它无关。

虽然有解决方法。

一种解决方法是将我们在拓扑内部接收到的每个键重新映射到一个新的键中,现在拓扑中的所有键都是使用相同的 Avro 模式(通过模式 ID 的相同 Avro 模式)生成的。

其他替代方案(不是真的更好)是 "strip the magic byte" 或对连接键使用不同的数据类型(例如,某些 POJO)。因此,所有这些方法都是相似的。