如何根据过滤器将csv数据拆分为两个不同的kafka主题
How to split the csv data to two different kafka topics based on filter
我有一个 Kafka 主题 Customer,我在其中流式传输了一个
.csv 文件。现在是否可以根据
转换kafka中的数据
计数。比如如果计数 小于 比 20 发送它到 topic A 并且如果计数 大于 20 发送至 主题 B
我是 kafka 的新手,我正在尝试这样但它不起作用
builder.stream("Customer")
.groupByKey()
.count()
.toStream()
.filter((k,v)-> String(v) > 20)
.to("test_A");
这个代码是错误的我很确定,但是请任何人帮助我
您现在正在做的是根据条件仅将一部分记录发送到一个主题,然后丢弃其他记录。如果您想将 then 的一部分发送到一个主题,而将其他部分发送到另一个主题,您应该使用分支运算符。
像这样:
KStream<K, Long>[] branches = builder.stream("Customer")
.groupByKey()
.count()
.toStream()
.branch((k, v) -> v > 20),
(k, v) -> v <= 20);
branches[0].to("topicB");
branches[1].to("topicA");
你应该注意的另一件事是比较必须在数字之间进行,而不是字符串,因为当你对字符串使用更大的比较器时,你是在比较字符串的长度和字典顺序
简单的解决方案是创建一个生产者并一次从您的输入 csv 文件中读取一行,然后根据计数条件将其发送到特定主题。
psuedo code:
while no row left in csv:
row = readrow()
if row.counts<20
producer.send(topicA,row)
else
producer.send(topicB,row)
你可以关注https://towardsdatascience.com/kafka-python-explained-in-10-lines-of-code-800e3e07dad1 来轻松理解kafka python
StreamsBuilder builder=new StreamsBuilder();
KStream<String, String> inputTopic = builder.stream("Trans_Topic");
inputTopic
.filter((k,v)->{
return Long.parseLong(v.split(",")[6]) < 20L; //6 is the column id.
}).to("trans_topic_result_1");
inputTopic
.filter((k,v)->{
return Long.parseLong(v.split(",")[6]) > 20L;
}).to("trans_topic_result_2");
我有一个 Kafka 主题 Customer,我在其中流式传输了一个
.csv 文件。现在是否可以根据
转换kafka中的数据
计数。比如如果计数 小于 比 20 发送它到 topic A 并且如果计数 大于 20 发送至 主题 B
我是 kafka 的新手,我正在尝试这样但它不起作用
builder.stream("Customer")
.groupByKey()
.count()
.toStream()
.filter((k,v)-> String(v) > 20)
.to("test_A");
这个代码是错误的我很确定,但是请任何人帮助我
您现在正在做的是根据条件仅将一部分记录发送到一个主题,然后丢弃其他记录。如果您想将 then 的一部分发送到一个主题,而将其他部分发送到另一个主题,您应该使用分支运算符。
像这样:
KStream<K, Long>[] branches = builder.stream("Customer")
.groupByKey()
.count()
.toStream()
.branch((k, v) -> v > 20),
(k, v) -> v <= 20);
branches[0].to("topicB");
branches[1].to("topicA");
你应该注意的另一件事是比较必须在数字之间进行,而不是字符串,因为当你对字符串使用更大的比较器时,你是在比较字符串的长度和字典顺序
简单的解决方案是创建一个生产者并一次从您的输入 csv 文件中读取一行,然后根据计数条件将其发送到特定主题。
psuedo code:
while no row left in csv:
row = readrow()
if row.counts<20
producer.send(topicA,row)
else
producer.send(topicB,row)
你可以关注https://towardsdatascience.com/kafka-python-explained-in-10-lines-of-code-800e3e07dad1 来轻松理解kafka python
StreamsBuilder builder=new StreamsBuilder();
KStream<String, String> inputTopic = builder.stream("Trans_Topic");
inputTopic
.filter((k,v)->{
return Long.parseLong(v.split(",")[6]) < 20L; //6 is the column id.
}).to("trans_topic_result_1");
inputTopic
.filter((k,v)->{
return Long.parseLong(v.split(",")[6]) > 20L;
}).to("trans_topic_result_2");