如何统计kafka Topic中每个ID出现的频率

How to count the frequency of each ID in kafka Topic

我有一个 Kafka 主题 Tranfer_History,我在其中流式传输了一个 CSV 文件。现在我想计算每个 PARTY_ID 的出现次数。然后在我想应用 transformation 之后:如果计数小于 20,则将其放入新主题 CHURN,如果大于 20,则将其放入新主题主题 忠诚 #我正在使用JAVA

public class FirstFilterer {

public static void main(String[] args) {

    final StreamsBuilder builder = new StreamsBuilder();

    /*input messages example
     {"155555","11.11.2016 11:12}
     {"155555","11.11.2016 13:12}
     {"155556","11.11.2016 13:12}
     result to be achived:
     {"155555","2"}
     {"155556","1"}
     */
    builder.stream("test_topic_3")
//                .map()
                  .groupByKey()
//                .windowedBy(Window) // This may or may not be required
                  .count()
                  .toStream()
                  .map(
                    (key,count) -> new KeyValue<>(key.toString(),count)
            )
            .to("test_output_filtered_3");//this topic is empty after the run

I am new to Kafka don't know much plz help me out

这可以很容易地通过 Kafka Streams 实现。首先保证你有KStream&KTable的背景。您需要按照以下步骤操作。

 builder.<Keytype, ValueType>stream(YourInputTopic))
    .map()
    .groupByKey()
    .windowedBy(TimeWindows.of(Duration.ofSeconds(10))) // This may or may not be required 
                                                           in your case
    .count()
    .toStream()
    .map((Windowed<String> key, Long count) -> new KeyValue<>(key.key(),count.toString()))
    .filter((k,v)-> Long.parseLong(v) > 20) // This is the filter
    .to(TopicName));

注意:这只是一个伪代码,可以让您了解如何实现这一壮举