使用kafka流根据消息密钥向主题发送消息
send message to topic based on message key using kafka streams
我希望能够根据消息键的键将Kafkastream中的所有记录发送到不同的主题。
前任。 Kafka 中的流包含名称作为键和记录作为值。我想根据记录的键将这些记录扇出到不同的主题
数据:(jhon -> {jhonsRecord}),(sean -> {seansRecord}),(mary -> {marysRecord}),(jhon -> {jhonsRecord2}),
预计
- topic1 :name: jhon ->(jhon -> {jhonsRecord}),(jhon -> {jhonsRecord2})
- topic2 :sean-> (sean -> {seansRecord})
- topic3 :mary -> (mary -> {marysRecord})
以下是我现在执行此操作的方式,但由于名称列表很慢,所以速度很慢。再加上即使记录了几个名字,也需要遍历整个列表 请指正
for( String name : names )
{
recordsByName.filterNot(( k, v ) -> k.equalsIgnoreCase(name)).to(name);
}
我想你要找的是KStream#branch
。
以下未经测试,但它显示了总体思路
// get a list of predicates to branch a topic on
final List<String> names = Arrays.asList("jhon", "sean", "mary");
final Predicate[] predicates = names.stream()
.map((Function<String, Predicate<String, Object>>) n -> (s, o) -> s.equals(n))
.toArray(Predicate[]::new);
// example input
final KStream<Object, Object> stream = new StreamsBuilder().stream("names");
// split the topic
KStream<String, Object>[] branches = stream.branch(predicates);
for (int i = 0; i < names.size(); i++) {
branches[i].to(names.get(i));
}
// KStream branches[0] contains all records whose keys are "jhon"
// KStream branches[1] contains all records whose keys are "sean"
...
我认为你应该使用 KStream::to(final TopicNameExtractor<K, V> topicExtractor)
函数。它使您能够计算每条消息的主题名称。
示例代码:
final KStream<String, String> stream = ???;
stream.to((key, value, recordContext) -> key);
如果您需要为每个用户生成聚合数据,则不需要为每个用户编写单独的主题。你最好在源流上写一个聚合。这样您就不会以每个键一个主题而告终,但您仍然可以 运行 独立地对每个用户进行操作。
Serde<UserRecord> recordSerde = ...
KStream<Stream, UserAggregate> aggregateByName = recordsByName
.groupByKey(Grouped.with(Serdes.String(), recordSerde))
.aggregate(...)
.toStream()
详情见https://docs.confluent.io/current/streams/developer-guide/dsl-api.html#aggregating
这种方法将扩展到数百万用户,您目前无法通过每个用户一个主题的方法实现这一目标。
我希望能够根据消息键的键将Kafkastream中的所有记录发送到不同的主题。 前任。 Kafka 中的流包含名称作为键和记录作为值。我想根据记录的键将这些记录扇出到不同的主题
数据:(jhon -> {jhonsRecord}),(sean -> {seansRecord}),(mary -> {marysRecord}),(jhon -> {jhonsRecord2}), 预计
- topic1 :name: jhon ->(jhon -> {jhonsRecord}),(jhon -> {jhonsRecord2})
- topic2 :sean-> (sean -> {seansRecord})
- topic3 :mary -> (mary -> {marysRecord})
以下是我现在执行此操作的方式,但由于名称列表很慢,所以速度很慢。再加上即使记录了几个名字,也需要遍历整个列表 请指正
for( String name : names )
{
recordsByName.filterNot(( k, v ) -> k.equalsIgnoreCase(name)).to(name);
}
我想你要找的是KStream#branch
。
以下未经测试,但它显示了总体思路
// get a list of predicates to branch a topic on
final List<String> names = Arrays.asList("jhon", "sean", "mary");
final Predicate[] predicates = names.stream()
.map((Function<String, Predicate<String, Object>>) n -> (s, o) -> s.equals(n))
.toArray(Predicate[]::new);
// example input
final KStream<Object, Object> stream = new StreamsBuilder().stream("names");
// split the topic
KStream<String, Object>[] branches = stream.branch(predicates);
for (int i = 0; i < names.size(); i++) {
branches[i].to(names.get(i));
}
// KStream branches[0] contains all records whose keys are "jhon"
// KStream branches[1] contains all records whose keys are "sean"
...
我认为你应该使用 KStream::to(final TopicNameExtractor<K, V> topicExtractor)
函数。它使您能够计算每条消息的主题名称。
示例代码:
final KStream<String, String> stream = ???;
stream.to((key, value, recordContext) -> key);
如果您需要为每个用户生成聚合数据,则不需要为每个用户编写单独的主题。你最好在源流上写一个聚合。这样您就不会以每个键一个主题而告终,但您仍然可以 运行 独立地对每个用户进行操作。
Serde<UserRecord> recordSerde = ...
KStream<Stream, UserAggregate> aggregateByName = recordsByName
.groupByKey(Grouped.with(Serdes.String(), recordSerde))
.aggregate(...)
.toStream()
详情见https://docs.confluent.io/current/streams/developer-guide/dsl-api.html#aggregating
这种方法将扩展到数百万用户,您目前无法通过每个用户一个主题的方法实现这一目标。