Kafka Streams - 缺少源主题
Kafka Streams - missing source topic
我在 Kafka Streams 拓扑中工作,有时,在更改 applicationId and/or clientId 属性后,我在特定的 kafka 流上收到错误:“Missing source topic stream.webshop.products.prices.5 durign assignment. Returning error INCOMPLETE_SOURCE_TOPIC_METADATA
”。我已经在每个Kafka节点的server.properties中设置了create.topic=true
属性,但是似乎没有创建这个流的主题。
这是我的 Kafka Streams 拓扑:
package ro.orange.eshop.productindexer.domain
import org.apache.kafka.streams.KafkaStreams
import org.apache.kafka.streams.StreamsBuilder
import org.apache.kafka.streams.kstream.Materialized
import org.apache.kafka.streams.kstream.Printed
import ro.orange.digital.avro.Aggregate
import ro.orange.digital.avro.Key
import ro.orange.digital.avro.Price
import ro.orange.digital.avro.StockQuantity
import ro.orange.eshop.productindexer.infrastructure.configuration.kafka.makeStoreProvider
import java.util.concurrent.CompletableFuture
class SaleProductTopology(
private val streamNameRepository: IStreamNameRepository,
private val saleProductMapper: ISaleProductMapper,
private val productRatingMapper: IProductRatingMapper,
private val productStockMapper: IProductStockMapper,
private val lazyKafkaStreams: CompletableFuture<KafkaStreams>
) {
fun streamsBuilder(): StreamsBuilder {
val streamsBuilder = StreamsBuilder()
val productsStream = streamsBuilder.stream<Key, Aggregate>(streamNameRepository.inputWebshopProductsTopic)
val productPricesStream = streamsBuilder.stream<Key, Price>(streamNameRepository.productsPricesStreamTopic)
val productsRatingsStream = streamsBuilder.stream<Key, Aggregate>(streamNameRepository.inputProductRatingsTopic)
val inputProductsStockStream = streamsBuilder.stream<Key, Aggregate>(streamNameRepository.inputProductsStockTopic)
val productsStockStream = inputProductsStockStream
.mapValues(productStockMapper::aStockQuantity)
productsStockStream.to(streamNameRepository.productsStockStreamTopic)
streamsBuilder.globalTable<Key, StockQuantity>(
streamNameRepository.productsStockStreamTopic,
Materialized.`as`(streamNameRepository.productsStockGlobalStoreTopic)
)
val quantityProvider = lazyKafkaStreams.makeStoreProvider<StockQuantity>(streamNameRepository.productsStockGlobalStoreTopic)
val saleProductsTable = productsStream
.groupByKey()
.reduce({ _, aggregate -> aggregate }, Materialized.`as`(streamNameRepository.saleProductsStoreTopic))
.mapValues { aggregate -> saleProductMapper.aSaleProduct(aggregate, quantityProvider) }
saleProductsTable.toStream().print(Printed.toSysOut())
val productPricesTable = productPricesStream
.groupByKey()
.reduce({ _, price -> price }, Materialized.`as`(streamNameRepository.productsPricesStoreTopic))
productPricesTable.toStream().print(Printed.toSysOut())
val productsRatingsTable = productsRatingsStream
.groupByKey()
.reduce({ _, aggregate -> aggregate }, Materialized.`as`(streamNameRepository.productsRatingsStoreTopic))
.mapValues { aggregate -> productRatingMapper.aProductRating(aggregate) }
productsRatingsTable.toStream().print(Printed.toSysOut())
val productsStockTable = productsStockStream
.groupByKey()
.reduce { _, aggregate -> aggregate }
saleProductsTable
.leftJoin(productPricesTable) { saleProduct, price -> saleProductMapper.aPricedSaleProduct(saleProduct, price) }
.leftJoin(productsRatingsTable) { saleProduct, rating -> saleProductMapper.aRatedSaleProduct(saleProduct, rating) }
.leftJoin(productsStockTable) { saleProduct, stockQuantity -> saleProductMapper.aQuantifiedSaleProduct(saleProduct, stockQuantity) }
.mapValues { saleProduct -> AggregateMapper.aSaleProductAggregate(saleProduct) }
.toStream()
.to(streamNameRepository.saleProductsTopic)
return streamsBuilder
}
}
正如@jacek-laskowski 所写:
KafkaStreams won't create it since it's a source
这是设计使然,因为如果源主题之一是自动创建的(它将有 默认 个分区数)并且第二个由用户提前创建,分区数可能与众不同。当 KStream/KTable 连接时,它们必须具有相同数量的分区 - 这是关键假设。
用户必须有意识地使用适当数量的分区创建主题(流处理线程的数量是控制 Kafka Streams 应用程序性能的一种方式)。
我在 Kafka Streams 拓扑中工作,有时,在更改 applicationId and/or clientId 属性后,我在特定的 kafka 流上收到错误:“Missing source topic stream.webshop.products.prices.5 durign assignment. Returning error INCOMPLETE_SOURCE_TOPIC_METADATA
”。我已经在每个Kafka节点的server.properties中设置了create.topic=true
属性,但是似乎没有创建这个流的主题。
这是我的 Kafka Streams 拓扑:
package ro.orange.eshop.productindexer.domain
import org.apache.kafka.streams.KafkaStreams
import org.apache.kafka.streams.StreamsBuilder
import org.apache.kafka.streams.kstream.Materialized
import org.apache.kafka.streams.kstream.Printed
import ro.orange.digital.avro.Aggregate
import ro.orange.digital.avro.Key
import ro.orange.digital.avro.Price
import ro.orange.digital.avro.StockQuantity
import ro.orange.eshop.productindexer.infrastructure.configuration.kafka.makeStoreProvider
import java.util.concurrent.CompletableFuture
class SaleProductTopology(
private val streamNameRepository: IStreamNameRepository,
private val saleProductMapper: ISaleProductMapper,
private val productRatingMapper: IProductRatingMapper,
private val productStockMapper: IProductStockMapper,
private val lazyKafkaStreams: CompletableFuture<KafkaStreams>
) {
fun streamsBuilder(): StreamsBuilder {
val streamsBuilder = StreamsBuilder()
val productsStream = streamsBuilder.stream<Key, Aggregate>(streamNameRepository.inputWebshopProductsTopic)
val productPricesStream = streamsBuilder.stream<Key, Price>(streamNameRepository.productsPricesStreamTopic)
val productsRatingsStream = streamsBuilder.stream<Key, Aggregate>(streamNameRepository.inputProductRatingsTopic)
val inputProductsStockStream = streamsBuilder.stream<Key, Aggregate>(streamNameRepository.inputProductsStockTopic)
val productsStockStream = inputProductsStockStream
.mapValues(productStockMapper::aStockQuantity)
productsStockStream.to(streamNameRepository.productsStockStreamTopic)
streamsBuilder.globalTable<Key, StockQuantity>(
streamNameRepository.productsStockStreamTopic,
Materialized.`as`(streamNameRepository.productsStockGlobalStoreTopic)
)
val quantityProvider = lazyKafkaStreams.makeStoreProvider<StockQuantity>(streamNameRepository.productsStockGlobalStoreTopic)
val saleProductsTable = productsStream
.groupByKey()
.reduce({ _, aggregate -> aggregate }, Materialized.`as`(streamNameRepository.saleProductsStoreTopic))
.mapValues { aggregate -> saleProductMapper.aSaleProduct(aggregate, quantityProvider) }
saleProductsTable.toStream().print(Printed.toSysOut())
val productPricesTable = productPricesStream
.groupByKey()
.reduce({ _, price -> price }, Materialized.`as`(streamNameRepository.productsPricesStoreTopic))
productPricesTable.toStream().print(Printed.toSysOut())
val productsRatingsTable = productsRatingsStream
.groupByKey()
.reduce({ _, aggregate -> aggregate }, Materialized.`as`(streamNameRepository.productsRatingsStoreTopic))
.mapValues { aggregate -> productRatingMapper.aProductRating(aggregate) }
productsRatingsTable.toStream().print(Printed.toSysOut())
val productsStockTable = productsStockStream
.groupByKey()
.reduce { _, aggregate -> aggregate }
saleProductsTable
.leftJoin(productPricesTable) { saleProduct, price -> saleProductMapper.aPricedSaleProduct(saleProduct, price) }
.leftJoin(productsRatingsTable) { saleProduct, rating -> saleProductMapper.aRatedSaleProduct(saleProduct, rating) }
.leftJoin(productsStockTable) { saleProduct, stockQuantity -> saleProductMapper.aQuantifiedSaleProduct(saleProduct, stockQuantity) }
.mapValues { saleProduct -> AggregateMapper.aSaleProductAggregate(saleProduct) }
.toStream()
.to(streamNameRepository.saleProductsTopic)
return streamsBuilder
}
}
正如@jacek-laskowski 所写:
KafkaStreams won't create it since it's a source
这是设计使然,因为如果源主题之一是自动创建的(它将有 默认 个分区数)并且第二个由用户提前创建,分区数可能与众不同。当 KStream/KTable 连接时,它们必须具有相同数量的分区 - 这是关键假设。
用户必须有意识地使用适当数量的分区创建主题(流处理线程的数量是控制 Kafka Streams 应用程序性能的一种方式)。