如何统计kafka Topic中每个ID出现的频率
How to count the frequency of each ID in kafka Topic
我有一个 Kafka 主题 Tranfer_History,我在其中流式传输了一个 CSV 文件。现在我想计算每个 PARTY_ID 的出现次数。然后在我想应用 transformation 之后:如果计数小于 20,则将其放入新主题 CHURN,如果大于 20,则将其放入新主题主题 忠诚
#我正在使用JAVA
public class FirstFilterer {
public static void main(String[] args) {
final StreamsBuilder builder = new StreamsBuilder();
/*input messages example
{"155555","11.11.2016 11:12}
{"155555","11.11.2016 13:12}
{"155556","11.11.2016 13:12}
result to be achived:
{"155555","2"}
{"155556","1"}
*/
builder.stream("test_topic_3")
// .map()
.groupByKey()
// .windowedBy(Window) // This may or may not be required
.count()
.toStream()
.map(
(key,count) -> new KeyValue<>(key.toString(),count)
)
.to("test_output_filtered_3");//this topic is empty after the run
I am new to Kafka don't know much plz help me out
这可以很容易地通过 Kafka Streams 实现。首先保证你有KStream&KTable的背景。您需要按照以下步骤操作。
builder.<Keytype, ValueType>stream(YourInputTopic))
.map()
.groupByKey()
.windowedBy(TimeWindows.of(Duration.ofSeconds(10))) // This may or may not be required
in your case
.count()
.toStream()
.map((Windowed<String> key, Long count) -> new KeyValue<>(key.key(),count.toString()))
.filter((k,v)-> Long.parseLong(v) > 20) // This is the filter
.to(TopicName));
注意:这只是一个伪代码,可以让您了解如何实现这一壮举
我有一个 Kafka 主题 Tranfer_History,我在其中流式传输了一个 CSV 文件。现在我想计算每个 PARTY_ID 的出现次数。然后在我想应用 transformation 之后:如果计数小于 20,则将其放入新主题 CHURN,如果大于 20,则将其放入新主题主题 忠诚 #我正在使用JAVA
public class FirstFilterer {
public static void main(String[] args) {
final StreamsBuilder builder = new StreamsBuilder();
/*input messages example
{"155555","11.11.2016 11:12}
{"155555","11.11.2016 13:12}
{"155556","11.11.2016 13:12}
result to be achived:
{"155555","2"}
{"155556","1"}
*/
builder.stream("test_topic_3")
// .map()
.groupByKey()
// .windowedBy(Window) // This may or may not be required
.count()
.toStream()
.map(
(key,count) -> new KeyValue<>(key.toString(),count)
)
.to("test_output_filtered_3");//this topic is empty after the run
I am new to Kafka don't know much plz help me out
这可以很容易地通过 Kafka Streams 实现。首先保证你有KStream&KTable的背景。您需要按照以下步骤操作。
builder.<Keytype, ValueType>stream(YourInputTopic))
.map()
.groupByKey()
.windowedBy(TimeWindows.of(Duration.ofSeconds(10))) // This may or may not be required
in your case
.count()
.toStream()
.map((Windowed<String> key, Long count) -> new KeyValue<>(key.key(),count.toString()))
.filter((k,v)-> Long.parseLong(v) > 20) // This is the filter
.to(TopicName));
注意:这只是一个伪代码,可以让您了解如何实现这一壮举