如何忽略从同一主题读取和写入不同事件类型的 Kafka Streams 应用程序中的某些类型的消息
How to ignore some kinds of messages in a Kafka Streams Application that reads and writes different event types from the same topic
假设 Spring Cloud Stream 应用程序从 order topic
创建了 KStream
。它对 OrderCreated {"id":x, "productId": y, "customerId": z}
事件感兴趣。一旦一个到达,它处理它并生成一个输出事件 OrderShipped {"id":x, "productId": y, "customerName": <, "customerAddress": z}
到相同的 order topic
.
我面临的问题是,由于它读写 from/to 相同的主题,Kafka Stream 应用程序正在尝试处理自己的写入,这没有意义。
如何阻止此应用程序处理它生成的事件?
更新: 正如 Artem Bilan 和 sobychako 指出的那样,我曾考虑过使用 KStream.filter()
但有些细节让我怀疑如何处理这个问题:
现在 KStream 应用程序如下所示:
interface ShippingKStreamProcessor {
...
@Input("order")
fun order(): KStream<String, OrderCreated>
@Output("output")
fun output(): KStream<String, OrderShipped>
KStream 配置
@StreamListener
@SendTo("output")
fun process(..., @Input("order") order: KStream<Int, OrderCreated>): KStream<Int, OrderShipped> {
订单和输出绑定都指向订单主题作为目的地。
已创建订单class:
data class OrderCreated(var id: Int?, var productId: Int?, var customerId: Int?) {
constructor() : this(null, null, null)
}
订单发货class
data class OrderShipped(var id: Int?, var productId: Int?, var customerName: String?, var customerAddress: String?) {
constructor() : this(null, null, null, null)
}
我使用 JSON 作为消息格式,因此消息看起来像这样:
- 输入 - 已创建订单:
{"id":1, "productId": 7,"customerId": 20}
- 输出 - 订单已发货:
{"id":1, "productId": 7, "customerName": "X", "customerAddress": "Y"}
考虑到这一点,我正在寻找过滤掉不需要的消息的最佳方法:
如果我现在只使用 KStream.filter()
,当我得到 {"id":1, "productId": 7, "customerName": "X", "customerAddress": "Y"}
时,我的 KStream<Int, OrderCreated>
会将 OrderShipped 事件解编为具有一些空字段的 OrderCreated 对象:OrderCreated(id:1, productId: 7, customerId: null)
.检查空字段听起来不太可靠。
可能的解决方案 可能是向使用该主题的每种 message/class 添加另一个字段 eventType = OrderCreated|OrderShipped
。即使在这种情况下,我最终也会得到一个带有属性 eventType=OrderShipped 的 OrderCreated class(记住 KStream< Int,OrderCreated >)。 这看起来像是一个丑陋的解决方法。有改进的想法吗?
是否有另一种更自动的方法来处理这个问题?例如,如果消息不符合预期的模式 (OrderCreated),另一种序列化 (AVRO?) 是否会阻止消息被处理?
根据这篇文章,这种在同一主题中支持多个模式(事件类型)的方式似乎是一种很好的做法:https://www.confluent.io/blog/put-several-event-types-kafka-topic/
然而,尚不清楚 unmarshall/deserialize 不同的类型。
你可以使用Kafka的记录headers来存储记录的类型。参见 KIP-82. You can set the headers in ProducerRecord
。
处理如下:
- 从主题中读取
KStream<Integer, Bytes>
类型的 stream
,值为 serde Serdes.BytesSerde
。
使用 KStream#transformValues()
to filter and create the objects. More specifically, within transformValues()
you can access the ProcessorContext
可以访问包含有关记录类型信息的记录 headers。那么:
- 如果类型是
OrderShipped
, return null
.
- 否则从
Bytes
object 和 return 创建一个 OrderCreated
object。
对于 AVRO 的解决方案,您可能需要查看以下文档
我接受了 Bruno 的回答作为解决这个问题的有效方法。但是我想我想出了一个更 straightforward/logical 的方法,使用用 JsonTypeInfo
.
注释的事件层次结构
首先,您需要为订单事件设置一个基础 class,并指定所有子class。请注意,将有一个类型 属性 添加到 JSON 文档中,这将有助于 Jackson marshal/unmarshal DTO:
@JsonTypeInfo(use = JsonTypeInfo.Id.NAME, include = JsonTypeInfo.As.PROPERTY, property = "type")
@JsonSubTypes(value = [
JsonSubTypes.Type(value = OrderCreatedEvent::class, name = "orderCreated"),
JsonSubTypes.Type(value = OrderShippedEvent::class, name = "orderShipped")
])
abstract class OrderEvent
data class OrderCreatedEvent(var id: Int?, var productId: Int?, var customerId: Int?) : OrderEvent() {
constructor() : this(null, null, null)
}
data class OrderShippedEvent(var id: Int?, var productId: Int?, var customerName: String?, var customerAddress: String?) : OrderEvent () {
constructor() : this(null, null, null, null)
}
有了这个,OrderCreatedEvent 对象的生产者将生成如下消息:
key: 1 value: {"type":"orderCreated","id":1,"productId":24,"customerId":1}
现在轮到KStream了。我已将签名更改为 KStream<Int, OrderEvent>
,因为它可以接收 OrderCreatedEvent 或 OrderShippedEvent。在接下来的两行中...
orderEvent.filter { _, value -> value is OrderCreatedEvent }
.map { key, value -> KeyValue(key, value as OrderCreatedEvent) }
...我过滤以仅保留 OrderCreatedEvent class 的消息并将它们映射以将 KStream<Int, OrderEvent>
转换为 KStream<Int, OrderCreatedEvent>
完整的 KStream 逻辑:
@StreamListener
@SendTo("output")
fun process(@Input("input") input: KStream<Int, Customer>, @Input("order") orderEvent: KStream<Int, OrderEvent>): KStream<Int, OrderShippedEvent> {
val intSerde = Serdes.IntegerSerde()
val customerSerde = JsonSerde<Customer>(Customer::class.java)
val orderCreatedSerde = JsonSerde<OrderCreatedEvent>(OrderCreatedEvent::class.java)
val stateStore: Materialized<Int, Customer, KeyValueStore<Bytes, ByteArray>> =
Materialized.`as`<Int, Customer, KeyValueStore<Bytes, ByteArray>>("customer-store")
.withKeySerde(intSerde)
.withValueSerde(customerSerde)
val customerTable: KTable<Int, Customer> = input.groupByKey(Serialized.with(intSerde, customerSerde))
.reduce({ _, y -> y }, stateStore)
return (orderEvent.filter { _, value -> value is OrderCreatedEvent }
.map { key, value -> KeyValue(key, value as OrderCreatedEvent) }
.selectKey { _, value -> value.customerId } as KStream<Int, OrderCreatedEvent>)
.join(customerTable, { orderIt, customer ->
OrderShippedEvent(orderIt.id, orderIt.productId, customer.name, customer.address)
}, Joined.with(intSerde, orderCreatedSerde, customerSerde))
.selectKey { _, value -> value.id }
//.to("order", Produced.with(intSerde, orderShippedSerde))
}
在此过程之后,我将在订单主题中生成一条新消息 key: 1 value: {"type":"orderShipped","id":1,"productId":24,"customerName":"Anna","customerAddress":"Cipress Street"}
,但这将被流过滤掉。
假设 Spring Cloud Stream 应用程序从 order topic
创建了 KStream
。它对 OrderCreated {"id":x, "productId": y, "customerId": z}
事件感兴趣。一旦一个到达,它处理它并生成一个输出事件 OrderShipped {"id":x, "productId": y, "customerName": <, "customerAddress": z}
到相同的 order topic
.
我面临的问题是,由于它读写 from/to 相同的主题,Kafka Stream 应用程序正在尝试处理自己的写入,这没有意义。
如何阻止此应用程序处理它生成的事件?
更新: 正如 Artem Bilan 和 sobychako 指出的那样,我曾考虑过使用 KStream.filter()
但有些细节让我怀疑如何处理这个问题:
现在 KStream 应用程序如下所示:
interface ShippingKStreamProcessor {
...
@Input("order")
fun order(): KStream<String, OrderCreated>
@Output("output")
fun output(): KStream<String, OrderShipped>
KStream 配置
@StreamListener
@SendTo("output")
fun process(..., @Input("order") order: KStream<Int, OrderCreated>): KStream<Int, OrderShipped> {
订单和输出绑定都指向订单主题作为目的地。
已创建订单class:
data class OrderCreated(var id: Int?, var productId: Int?, var customerId: Int?) {
constructor() : this(null, null, null)
}
订单发货class
data class OrderShipped(var id: Int?, var productId: Int?, var customerName: String?, var customerAddress: String?) {
constructor() : this(null, null, null, null)
}
我使用 JSON 作为消息格式,因此消息看起来像这样:
- 输入 - 已创建订单:
{"id":1, "productId": 7,"customerId": 20}
- 输出 - 订单已发货:
{"id":1, "productId": 7, "customerName": "X", "customerAddress": "Y"}
考虑到这一点,我正在寻找过滤掉不需要的消息的最佳方法:
如果我现在只使用 KStream.filter()
,当我得到 {"id":1, "productId": 7, "customerName": "X", "customerAddress": "Y"}
时,我的 KStream<Int, OrderCreated>
会将 OrderShipped 事件解编为具有一些空字段的 OrderCreated 对象:OrderCreated(id:1, productId: 7, customerId: null)
.检查空字段听起来不太可靠。
可能的解决方案 可能是向使用该主题的每种 message/class 添加另一个字段 eventType = OrderCreated|OrderShipped
。即使在这种情况下,我最终也会得到一个带有属性 eventType=OrderShipped 的 OrderCreated class(记住 KStream< Int,OrderCreated >)。 这看起来像是一个丑陋的解决方法。有改进的想法吗?
是否有另一种更自动的方法来处理这个问题?例如,如果消息不符合预期的模式 (OrderCreated),另一种序列化 (AVRO?) 是否会阻止消息被处理? 根据这篇文章,这种在同一主题中支持多个模式(事件类型)的方式似乎是一种很好的做法:https://www.confluent.io/blog/put-several-event-types-kafka-topic/ 然而,尚不清楚 unmarshall/deserialize 不同的类型。
你可以使用Kafka的记录headers来存储记录的类型。参见 KIP-82. You can set the headers in ProducerRecord
。
处理如下:
- 从主题中读取
KStream<Integer, Bytes>
类型的stream
,值为 serdeSerdes.BytesSerde
。 使用
KStream#transformValues()
to filter and create the objects. More specifically, withintransformValues()
you can access theProcessorContext
可以访问包含有关记录类型信息的记录 headers。那么:- 如果类型是
OrderShipped
, returnnull
. - 否则从
Bytes
object 和 return 创建一个OrderCreated
object。
- 如果类型是
对于 AVRO 的解决方案,您可能需要查看以下文档
我接受了 Bruno 的回答作为解决这个问题的有效方法。但是我想我想出了一个更 straightforward/logical 的方法,使用用 JsonTypeInfo
.
首先,您需要为订单事件设置一个基础 class,并指定所有子class。请注意,将有一个类型 属性 添加到 JSON 文档中,这将有助于 Jackson marshal/unmarshal DTO:
@JsonTypeInfo(use = JsonTypeInfo.Id.NAME, include = JsonTypeInfo.As.PROPERTY, property = "type")
@JsonSubTypes(value = [
JsonSubTypes.Type(value = OrderCreatedEvent::class, name = "orderCreated"),
JsonSubTypes.Type(value = OrderShippedEvent::class, name = "orderShipped")
])
abstract class OrderEvent
data class OrderCreatedEvent(var id: Int?, var productId: Int?, var customerId: Int?) : OrderEvent() {
constructor() : this(null, null, null)
}
data class OrderShippedEvent(var id: Int?, var productId: Int?, var customerName: String?, var customerAddress: String?) : OrderEvent () {
constructor() : this(null, null, null, null)
}
有了这个,OrderCreatedEvent 对象的生产者将生成如下消息:
key: 1 value: {"type":"orderCreated","id":1,"productId":24,"customerId":1}
现在轮到KStream了。我已将签名更改为 KStream<Int, OrderEvent>
,因为它可以接收 OrderCreatedEvent 或 OrderShippedEvent。在接下来的两行中...
orderEvent.filter { _, value -> value is OrderCreatedEvent }
.map { key, value -> KeyValue(key, value as OrderCreatedEvent) }
...我过滤以仅保留 OrderCreatedEvent class 的消息并将它们映射以将 KStream<Int, OrderEvent>
转换为 KStream<Int, OrderCreatedEvent>
完整的 KStream 逻辑:
@StreamListener
@SendTo("output")
fun process(@Input("input") input: KStream<Int, Customer>, @Input("order") orderEvent: KStream<Int, OrderEvent>): KStream<Int, OrderShippedEvent> {
val intSerde = Serdes.IntegerSerde()
val customerSerde = JsonSerde<Customer>(Customer::class.java)
val orderCreatedSerde = JsonSerde<OrderCreatedEvent>(OrderCreatedEvent::class.java)
val stateStore: Materialized<Int, Customer, KeyValueStore<Bytes, ByteArray>> =
Materialized.`as`<Int, Customer, KeyValueStore<Bytes, ByteArray>>("customer-store")
.withKeySerde(intSerde)
.withValueSerde(customerSerde)
val customerTable: KTable<Int, Customer> = input.groupByKey(Serialized.with(intSerde, customerSerde))
.reduce({ _, y -> y }, stateStore)
return (orderEvent.filter { _, value -> value is OrderCreatedEvent }
.map { key, value -> KeyValue(key, value as OrderCreatedEvent) }
.selectKey { _, value -> value.customerId } as KStream<Int, OrderCreatedEvent>)
.join(customerTable, { orderIt, customer ->
OrderShippedEvent(orderIt.id, orderIt.productId, customer.name, customer.address)
}, Joined.with(intSerde, orderCreatedSerde, customerSerde))
.selectKey { _, value -> value.id }
//.to("order", Produced.with(intSerde, orderShippedSerde))
}
在此过程之后,我将在订单主题中生成一条新消息 key: 1 value: {"type":"orderShipped","id":1,"productId":24,"customerName":"Anna","customerAddress":"Cipress Street"}
,但这将被流过滤掉。