KTable 作为带有空键的主题的输入
KTable as input from topic with null keys
我是 Kafka Streams 的新手,遇到了一个问题。
我有两个 tables - 一个用于长期数据 (descriptions),另一个用于实时数据 (live).他们有一个共同的 id.
想法是存储来自 descriptions 的数据(大概在 KTable 中,为每个 id 保留最新的描述)和新消息时出现在 live - 与来自 descriptions 的数据结合在相应的 id 上并进一步发送。
为简单起见,让我们将所有类型设为字符串。
所以我看到的每个教程的基本思想都是这样的:
interface Processor {
@Input("live")
KStream<String, String> input();
@Input("descriptions")
KTable<String, String> input();
@Output("output")
KStream<String, String> output();
}
然后:
@StreamListener
@SendTo("output")
public KStream<String, String> process(
@Input("live") KStream<String, String> live,
@Input("descriptions") KTable<String, String> descriptions) {
// ...
}
问题是 descriptions 主题不是 KTable-suitable(空键,只是消息)。
所以我不能将它用作输入,也不能创建任何新的中间主题来存储此 table 中的有效流(基本上是只读的)。
我正在搜索某种内存中的绑定目的地,但无济于事。
我认为可能的方式是创建一个 中间 输出,它只在内存中存储 KTable 或其他东西,然后使用这个 中间 作为 实时 处理中的输入。喜欢:
@StreamListener("descriptions")
@SendTo("intermediate")
public KTable<String, String> process(@Input("descriptions") KStream<String, String> descriptions) {
// ...
}
希望这种绑定语义是可能的。
我认为你可以尝试通过引入初始处理器来引入存储 key/value 的中间主题。然后将该流用作常规处理器中输入的 table。这里有一些模板。我正在使用 Spring Cloud Stream 中的新功能模型来编写这些处理器。
@Bean
public Function<KStream<String, String>, KStream<String, String>> processDescriptions() {
return descriptions ->
descriptions.map((key, value) -> {
Pojo p = parseIntoPojo(value);
return new KeyValue<>(p.getId(), value);
})
.groupByKey()
.reduce((v1, v2) -> v2)
.toStream();
}
@Bean
public BiFunction<KStream<String, String>, KTable<String, String>, KStream<String, String>> realStream() {
return (live, description) -> {
}
}
第一个处理器接收 description
作为 KStream
,然后用密钥丰富它,然后输出为 KStream
。现在这个主题既有键又有值,我们可以在下一个处理器中将其用作 KTable
。下一个处理器是 java.util.function.BiFunction
,它接收两个输入并生成一个输出。输入分别是 KStream
和 KTable
,输出是 KStream
。
您可以按如下方式在它们上设置目的地:
spring.cloud.stream.function.definition=prorcessDescriptions;realStream
spring.cloud.stream.bindings.processDescriptions-in-0.destinaion=description-topic
spring.cloud.stream.bindings.processDescriptions-out-0.destinaion=description-table-topic
spring.cloud.stream.bindings.realStream-in-0.destinaion=live-topic
spring.cloud.stream.bindings.realStream-in-1.destinaion=description-table-topic
spring.cloud.stream.bindings.realStream-out-0.destinaion=output
您也可以使用 StreamListener
方法获得相同的结果。
这种方法的缺点是你需要在 Kafka 中维护一个额外的中间主题,但如果你真的想要它作为一个 KTable
并且底层信息是非键控的,我不认为这里的选项太多了。
如果您不需要顶级描述 KTable
,您可以以某种方式将其存储在状态存储中,然后查询将所有存储在单个处理器中。我还没有尝试过,所以你需要尝试一下这个想法。基本上,你会得到两个流,直播和描述
(live, descriptions) -> Reduce key/value for descriptions and keep that in a state store.
Then, do the processing on live by joining with what is in the state store.
Kafka Streams 允许多种方式来完成类似的事情。查看他们的参考文档以获取更多信息。
希望这对您有所帮助。
我是 Kafka Streams 的新手,遇到了一个问题。
我有两个 tables - 一个用于长期数据 (descriptions),另一个用于实时数据 (live).他们有一个共同的 id.
想法是存储来自 descriptions 的数据(大概在 KTable 中,为每个 id 保留最新的描述)和新消息时出现在 live - 与来自 descriptions 的数据结合在相应的 id 上并进一步发送。
为简单起见,让我们将所有类型设为字符串。
所以我看到的每个教程的基本思想都是这样的:
interface Processor {
@Input("live")
KStream<String, String> input();
@Input("descriptions")
KTable<String, String> input();
@Output("output")
KStream<String, String> output();
}
然后:
@StreamListener
@SendTo("output")
public KStream<String, String> process(
@Input("live") KStream<String, String> live,
@Input("descriptions") KTable<String, String> descriptions) {
// ...
}
问题是 descriptions 主题不是 KTable-suitable(空键,只是消息)。
所以我不能将它用作输入,也不能创建任何新的中间主题来存储此 table 中的有效流(基本上是只读的)。
我正在搜索某种内存中的绑定目的地,但无济于事。
我认为可能的方式是创建一个 中间 输出,它只在内存中存储 KTable 或其他东西,然后使用这个 中间 作为 实时 处理中的输入。喜欢:
@StreamListener("descriptions")
@SendTo("intermediate")
public KTable<String, String> process(@Input("descriptions") KStream<String, String> descriptions) {
// ...
}
希望这种绑定语义是可能的。
我认为你可以尝试通过引入初始处理器来引入存储 key/value 的中间主题。然后将该流用作常规处理器中输入的 table。这里有一些模板。我正在使用 Spring Cloud Stream 中的新功能模型来编写这些处理器。
@Bean
public Function<KStream<String, String>, KStream<String, String>> processDescriptions() {
return descriptions ->
descriptions.map((key, value) -> {
Pojo p = parseIntoPojo(value);
return new KeyValue<>(p.getId(), value);
})
.groupByKey()
.reduce((v1, v2) -> v2)
.toStream();
}
@Bean
public BiFunction<KStream<String, String>, KTable<String, String>, KStream<String, String>> realStream() {
return (live, description) -> {
}
}
第一个处理器接收 description
作为 KStream
,然后用密钥丰富它,然后输出为 KStream
。现在这个主题既有键又有值,我们可以在下一个处理器中将其用作 KTable
。下一个处理器是 java.util.function.BiFunction
,它接收两个输入并生成一个输出。输入分别是 KStream
和 KTable
,输出是 KStream
。
您可以按如下方式在它们上设置目的地:
spring.cloud.stream.function.definition=prorcessDescriptions;realStream
spring.cloud.stream.bindings.processDescriptions-in-0.destinaion=description-topic
spring.cloud.stream.bindings.processDescriptions-out-0.destinaion=description-table-topic
spring.cloud.stream.bindings.realStream-in-0.destinaion=live-topic
spring.cloud.stream.bindings.realStream-in-1.destinaion=description-table-topic
spring.cloud.stream.bindings.realStream-out-0.destinaion=output
您也可以使用 StreamListener
方法获得相同的结果。
这种方法的缺点是你需要在 Kafka 中维护一个额外的中间主题,但如果你真的想要它作为一个 KTable
并且底层信息是非键控的,我不认为这里的选项太多了。
如果您不需要顶级描述 KTable
,您可以以某种方式将其存储在状态存储中,然后查询将所有存储在单个处理器中。我还没有尝试过,所以你需要尝试一下这个想法。基本上,你会得到两个流,直播和描述
(live, descriptions) -> Reduce key/value for descriptions and keep that in a state store.
Then, do the processing on live by joining with what is in the state store.
Kafka Streams 允许多种方式来完成类似的事情。查看他们的参考文档以获取更多信息。
希望这对您有所帮助。