同时操作KStream和KTable
Simultaneous operations on KStream & KTables
我正在尝试在 Kafka Streams 中实现一个用例,其中我根据在此流上应用一些过滤器来填充 Ktable,我们称其为 table 跟踪 table 其中键是从事件派生的,值是事件。
现在对于后续事件,我检查此 table 以验证它们是否被跟踪并更新事件(如果被跟踪)或将其发布到不同的主题。我不确定如何同时执行此操作。这是我目前所拥有的。
// Branch based on conditions
KStream<String, Event>[] segregatedRecords = branches[0]
.branch((key, event) -> event.getStatus().getStatus().equals("A"),
(key, event) -> event.getStatus().getStatus().equals("B"),
(key, event) -> event.getStatus().getStatus().equals("C"),
// Store events with status A to a topic
segregatedRecords[0]
.selectKey((key, event) -> createKey(event))
.mapValues(transform)
.to(intermediateTopic);
// Load topic from previous step as GlobalKtable
GlobalKTable<String, Event> trackedEvents = streamsBuilder.globalTable(intermediateTopic);
// The following step is where I'm stuck, because I can not perform conditional actions
// If the event exists in the tracking table (update) but if not then how to publish it to a different topic?
segregatedRecords[1]
// derive key for lookup
.selectKey((key, event) -> createKey(event))
// update the event status in the table
.join(trackedEvents, (key, value) -> key,(event, tracked) -> modifiedEvent
).to(intermediateTopic);
// Other events will need to refer the latest information in the tracked table for further processing
您可以通过将 segregatedRecords[1]
分支为 2 个子拓扑来执行此操作,一个分支执行 table 锁定作为您的代码,而另一个分支使用低级处理器 API(使用a transformValues in this case) 检查底层 GlobalKTable
state store 是否包含新派生键的记录,如果记录存在则将 Event 转换为 null
Event,然后我们过滤掉具有null Event
(因为我们已经加入了您的第一个子拓扑中的那些事件)。
我稍微更新了你的代码:
//give your GlobalKTable a name to query later
GlobalKTable<String, Event> trackedEvents = streamsBuilder.globalTable(intermediateTopic, Materialized.as("tracked_event_global_store"));
KStream<String, Event> derivedKStream = segregatedRecords[1]
// derive key for lookup
.selectKey((key, event) -> createKey(event));
// this sub-topology perform table lockup as normal: update the event status in the table
derivedKStream.join(trackedEvents, (key, value) -> key,(event, tracked) -> modifiedEvent)
.to(intermediateTopic);
// this sub-topology check whether the event existed in trackedEvents, if yes then event has been already joined
// so we transform to null value and filter in next step
derivedKStream.transformValues(() -> new ValueTransformerWithKey<String, Event, Event>() {
//get the underlying store of Tracked GlobalKTable
KeyValueStore<String, Event> trackedKvStore;
@Override
public void init(ProcessorContext context) {
//using the previous name
trackedKvStore = (KeyValueStore<String, Event>) context.getStateStore("tracked_event_global_store");
}
@Override
public Event transform(String derivedKey, Event event) {
//if event existed in trackedEvents then return a null event so we can filter out in next pipe
if (trackedKvStore.get(derivedKey) != null) {
return null;
}
//event not exist in trackedEvents, keep the event and send to different topic
return event;
}
@Override
public void close() {
}
})
.filter((derivedKey, event) -> event != null)
.to("your different toic name");
Update : 关于无法从单个主题创建 GlobalKTable 和 KStream 的问题 intermediate
(无法多次读取主题 as described here):
- 为
GlobalKTable
创建一个专用输入主题(该主题必须启用日志压缩):
KStream<Object, Object> intermediateKStream = streamsBuilder.stream(intermediate);
intermediateKStream.to(trackedInputTopic);
//instead of building GlobalKTable from intermediate, use this dedicated topic trackedInputTopic
GlobalKTable<String, Event> trackedEvents = streamsBuilder.globalTable(trackedInputTopic, Materialized.as("tracked_event_global_store"));
//Perform things you want to do with the intermediate topic
intermediateKStream
...
我正在尝试在 Kafka Streams 中实现一个用例,其中我根据在此流上应用一些过滤器来填充 Ktable,我们称其为 table 跟踪 table 其中键是从事件派生的,值是事件。 现在对于后续事件,我检查此 table 以验证它们是否被跟踪并更新事件(如果被跟踪)或将其发布到不同的主题。我不确定如何同时执行此操作。这是我目前所拥有的。
// Branch based on conditions
KStream<String, Event>[] segregatedRecords = branches[0]
.branch((key, event) -> event.getStatus().getStatus().equals("A"),
(key, event) -> event.getStatus().getStatus().equals("B"),
(key, event) -> event.getStatus().getStatus().equals("C"),
// Store events with status A to a topic
segregatedRecords[0]
.selectKey((key, event) -> createKey(event))
.mapValues(transform)
.to(intermediateTopic);
// Load topic from previous step as GlobalKtable
GlobalKTable<String, Event> trackedEvents = streamsBuilder.globalTable(intermediateTopic);
// The following step is where I'm stuck, because I can not perform conditional actions
// If the event exists in the tracking table (update) but if not then how to publish it to a different topic?
segregatedRecords[1]
// derive key for lookup
.selectKey((key, event) -> createKey(event))
// update the event status in the table
.join(trackedEvents, (key, value) -> key,(event, tracked) -> modifiedEvent
).to(intermediateTopic);
// Other events will need to refer the latest information in the tracked table for further processing
您可以通过将 segregatedRecords[1]
分支为 2 个子拓扑来执行此操作,一个分支执行 table 锁定作为您的代码,而另一个分支使用低级处理器 API(使用a transformValues in this case) 检查底层 GlobalKTable
state store 是否包含新派生键的记录,如果记录存在则将 Event 转换为 null
Event,然后我们过滤掉具有null Event
(因为我们已经加入了您的第一个子拓扑中的那些事件)。
我稍微更新了你的代码:
//give your GlobalKTable a name to query later
GlobalKTable<String, Event> trackedEvents = streamsBuilder.globalTable(intermediateTopic, Materialized.as("tracked_event_global_store"));
KStream<String, Event> derivedKStream = segregatedRecords[1]
// derive key for lookup
.selectKey((key, event) -> createKey(event));
// this sub-topology perform table lockup as normal: update the event status in the table
derivedKStream.join(trackedEvents, (key, value) -> key,(event, tracked) -> modifiedEvent)
.to(intermediateTopic);
// this sub-topology check whether the event existed in trackedEvents, if yes then event has been already joined
// so we transform to null value and filter in next step
derivedKStream.transformValues(() -> new ValueTransformerWithKey<String, Event, Event>() {
//get the underlying store of Tracked GlobalKTable
KeyValueStore<String, Event> trackedKvStore;
@Override
public void init(ProcessorContext context) {
//using the previous name
trackedKvStore = (KeyValueStore<String, Event>) context.getStateStore("tracked_event_global_store");
}
@Override
public Event transform(String derivedKey, Event event) {
//if event existed in trackedEvents then return a null event so we can filter out in next pipe
if (trackedKvStore.get(derivedKey) != null) {
return null;
}
//event not exist in trackedEvents, keep the event and send to different topic
return event;
}
@Override
public void close() {
}
})
.filter((derivedKey, event) -> event != null)
.to("your different toic name");
Update : 关于无法从单个主题创建 GlobalKTable 和 KStream 的问题 intermediate
(无法多次读取主题 as described here):
- 为
GlobalKTable
创建一个专用输入主题(该主题必须启用日志压缩):
KStream<Object, Object> intermediateKStream = streamsBuilder.stream(intermediate);
intermediateKStream.to(trackedInputTopic);
//instead of building GlobalKTable from intermediate, use this dedicated topic trackedInputTopic
GlobalKTable<String, Event> trackedEvents = streamsBuilder.globalTable(trackedInputTopic, Materialized.as("tracked_event_global_store"));
//Perform things you want to do with the intermediate topic
intermediateKStream
...