使用kafka流根据消息密钥向主题发送消息

send message to topic based on message key using kafka streams

我希望能够根据消息键的键将Kafkastream中的所有记录发送到不同的主题。 前任。 Kafka 中的流包含名称作为键和记录作为值。我想根据记录的键将这些记录扇出到不同的主题

数据:(jhon -> {jhonsRecord}),(sean -> {seansRecord}),(mary -> {marysRecord}),(jhon -> {jhonsRecord2}), 预计

以下是我现在执行此操作的方式,但由于名称列表很慢,所以速度很慢。再加上即使记录了几个名字,也需要遍历整个列表 请指正

    for( String name : names )
    {
        recordsByName.filterNot(( k, v ) -> k.equalsIgnoreCase(name)).to(name);
    } 

我想你要找的是KStream#branch

以下未经测试,但它显示了总体思路

// get a list of predicates to branch a topic on
final List<String> names = Arrays.asList("jhon", "sean", "mary");
final Predicate[] predicates = names.stream()
    .map((Function<String, Predicate<String, Object>>) n -> (s, o) -> s.equals(n))
    .toArray(Predicate[]::new);

// example input
final KStream<Object, Object> stream = new StreamsBuilder().stream("names");

// split the topic
KStream<String, Object>[] branches = stream.branch(predicates);
for (int i = 0; i < names.size(); i++) {
    branches[i].to(names.get(i));
}

// KStream branches[0] contains all records whose keys are "jhon"
// KStream branches[1] contains all records whose keys are "sean"
...

我认为你应该使用 KStream::to(final TopicNameExtractor<K, V> topicExtractor) 函数。它使您能够计算每条消息的主题名称。

示例代码:

final KStream<String, String> stream = ???;
stream.to((key, value, recordContext) -> key);

如果您需要为每个用户生成聚合数据,则不需要为每个用户编写单独的主题。你最好在源流上写一个聚合。这样您就不会以每个键一个主题而告终,但您仍然可以 运行 独立地对每个用户进行操作。

Serde<UserRecord> recordSerde = ...
KStream<Stream, UserAggregate> aggregateByName = recordsByName
   .groupByKey(Grouped.with(Serdes.String(), recordSerde))
   .aggregate(...)
   .toStream()

详情见https://docs.confluent.io/current/streams/developer-guide/dsl-api.html#aggregating

这种方法将扩展到数百万用户,您目前无法通过每个用户一个主题的方法实现这一目标。