如何根据过滤器将csv数据拆分为两个不同的kafka主题

How to split the csv data to two different kafka topics based on filter

我有一个 Kafka 主题 Customer,我在其中流式传输了一个 .csv 文件。现在是否可以根据
转换kafka中的数据 计数。比如如果计数 小于 比 20 发送它到 topic A 并且如果计数 大于 20 发送至 主题 B
我是 kafka 的新手,我正在尝试这样但它不起作用

 builder.stream("Customer")
            .groupByKey()
            .count()
            .toStream()
            .filter((k,v)-> String(v) > 20)
            .to("test_A");

这个代码是错误的我很确定,但是请任何人帮助我

您现在正在做的是根据条件仅将一部分记录发送到一个主题,然后丢弃其他记录。如果您想将 then 的一部分发送到一个主题,而将其他部分发送到另一个主题,您应该使用分支运算符。

像这样:

KStream<K, Long>[] branches = builder.stream("Customer")
        .groupByKey()
        .count()
        .toStream()
        .branch((k, v) -> v > 20),
                (k, v) -> v <= 20);
branches[0].to("topicB");
branches[1].to("topicA");

你应该注意的另一件事是比较必须在数字之间进行,而不是字符串,因为当你对字符串使用更大的比较器时,你是在比较字符串的长度和字典顺序

简单的解决方案是创建一个生产者并一次从您的输入 csv 文件中读取一行,然后根据计数条件将其发送到特定主题。

psuedo code:
while no row left in csv:
    row = readrow()
    if row.counts<20
       producer.send(topicA,row)
    else
       producer.send(topicB,row)

你可以关注https://towardsdatascience.com/kafka-python-explained-in-10-lines-of-code-800e3e07dad1 来轻松理解kafka python

   StreamsBuilder builder=new StreamsBuilder();


    KStream<String, String> inputTopic = builder.stream("Trans_Topic");

            inputTopic
                   .filter((k,v)->{
                       return Long.parseLong(v.split(",")[6]) < 20L; //6 is the column id.
                   }).to("trans_topic_result_1");
    inputTopic
            .filter((k,v)->{
                return Long.parseLong(v.split(",")[6]) > 20L;
            }).to("trans_topic_result_2");