如何理解kafka流聚合?
How to understand kafka streams aggregation?
我是 kafka 的新手,正在学习它。我只是在为员工汇总数据,但 运行 遇到了问题。有人可以帮忙吗
我有一个主题 timeoffs,键 time_off_id 和类型对象的值也包含员工 ID。所以我想建立一个商店,其中员工 ID 应该是键,值应该是该员工时间的列表 offs.But 我遵循以下方法,但 运行 有问题。汇总数据时,方法参考中的 return 类型错误:无法将 ArrayList 转换为 VR。你能帮帮我吗
代码:
KTable<String, TimeOff> timeoffs = builder.table(topic);
KGroupedTable<String, TimeOff> groupedTable = timeoffs.groupBy(
(key, value) -> KeyValue.pair(value.getEmployeeId(), value)
);
groupedTable.aggregate(ArrayList<TimeOff>::new, (k, newValue, aggValue) -> {
aggValue.add(newValue);
return aggValue;
}, Materialized.as("NewStore"));
我也尝试过这种方法,但还是没有解决问题。
TimeOffList class:
package com.kafka.productiontest.models;
import java.util.ArrayList;
public class TimeOffList {
ArrayList list = new ArrayList<TimeOff>();
public TimeOffList add(Object s) {
list.add(s);
return this;
}
}
流式传输中 class:
groupedTable.aggregate(TimeOffList::new,
(k, newValue, aggValue) -> (TimeOffList) aggValue.add(newValue));
实施您的解决方案后,这个问题消失了,但现在面临 serde 问题。我已经实施了 TimeOffListSerde。请检查下面的代码
KStream<String, TimeOff> source = builder.stream(topic);
source.groupBy((k, v) -> v.getEmployeeId())
.aggregate(ArrayList::new,
(key, value, aggregate) -> {
aggregate.add(value);
return aggregate;
}, Materialized.as("NewStore").withValueSerde(new TimeOffListSerde(TimeOff.class)));
TimeOffListSerde.java
package com.kafka.productiontest.models;
import org.apache.kafka.common.serialization.Deserializer;
import org.apache.kafka.common.serialization.Serde;
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.common.serialization.Serializer;
import java.util.ArrayList;
import java.util.Map;
public class TimeOffListSerde implements Serde<ArrayList<TimeOff>> {
private Serde<ArrayList<TimeOff>> inner;
public TimeOffListSerde() {
}
public TimeOffListSerde(Serde<TimeOff> serde){
inner = Serdes.serdeFrom(new TimeOffListSerializer(serde.serializer()), new TimeOffListDeserializer(serde.deserializer()));
}
@Override
public void configure(Map<String, ?> configs, boolean isKey) {
inner.serializer().configure(configs, isKey);
inner.deserializer().configure(configs, isKey);
}
@Override
public void close() {
inner.serializer().close();
inner.deserializer().close();
}
@Override
public Serializer<ArrayList<TimeOff>> serializer() {
return inner.serializer();
}
@Override
public Deserializer<ArrayList<TimeOff>> deserializer() {
return inner.deserializer();
}
}
你想要这个吗?
KStream<String, TimeOff> source = builder.stream(sourceTopic);
KTable<String, List<TimeOff>> table = source.groupBy((k, v) -> v.getId())
.aggregate(ArrayList::new,
(key, value, aggregate) -> {
aggregate.add(value);
return aggregate;
}, Materialized.as("NewStore"));
我是 kafka 的新手,正在学习它。我只是在为员工汇总数据,但 运行 遇到了问题。有人可以帮忙吗
我有一个主题 timeoffs,键 time_off_id 和类型对象的值也包含员工 ID。所以我想建立一个商店,其中员工 ID 应该是键,值应该是该员工时间的列表 offs.But 我遵循以下方法,但 运行 有问题。汇总数据时,方法参考中的 return 类型错误:无法将 ArrayList 转换为 VR。你能帮帮我吗
代码:
KTable<String, TimeOff> timeoffs = builder.table(topic);
KGroupedTable<String, TimeOff> groupedTable = timeoffs.groupBy(
(key, value) -> KeyValue.pair(value.getEmployeeId(), value)
);
groupedTable.aggregate(ArrayList<TimeOff>::new, (k, newValue, aggValue) -> {
aggValue.add(newValue);
return aggValue;
}, Materialized.as("NewStore"));
我也尝试过这种方法,但还是没有解决问题。
TimeOffList class:
package com.kafka.productiontest.models;
import java.util.ArrayList;
public class TimeOffList {
ArrayList list = new ArrayList<TimeOff>();
public TimeOffList add(Object s) {
list.add(s);
return this;
}
}
流式传输中 class:
groupedTable.aggregate(TimeOffList::new,
(k, newValue, aggValue) -> (TimeOffList) aggValue.add(newValue));
实施您的解决方案后,这个问题消失了,但现在面临 serde 问题。我已经实施了 TimeOffListSerde。请检查下面的代码
KStream<String, TimeOff> source = builder.stream(topic);
source.groupBy((k, v) -> v.getEmployeeId())
.aggregate(ArrayList::new,
(key, value, aggregate) -> {
aggregate.add(value);
return aggregate;
}, Materialized.as("NewStore").withValueSerde(new TimeOffListSerde(TimeOff.class)));
TimeOffListSerde.java
package com.kafka.productiontest.models;
import org.apache.kafka.common.serialization.Deserializer;
import org.apache.kafka.common.serialization.Serde;
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.common.serialization.Serializer;
import java.util.ArrayList;
import java.util.Map;
public class TimeOffListSerde implements Serde<ArrayList<TimeOff>> {
private Serde<ArrayList<TimeOff>> inner;
public TimeOffListSerde() {
}
public TimeOffListSerde(Serde<TimeOff> serde){
inner = Serdes.serdeFrom(new TimeOffListSerializer(serde.serializer()), new TimeOffListDeserializer(serde.deserializer()));
}
@Override
public void configure(Map<String, ?> configs, boolean isKey) {
inner.serializer().configure(configs, isKey);
inner.deserializer().configure(configs, isKey);
}
@Override
public void close() {
inner.serializer().close();
inner.deserializer().close();
}
@Override
public Serializer<ArrayList<TimeOff>> serializer() {
return inner.serializer();
}
@Override
public Deserializer<ArrayList<TimeOff>> deserializer() {
return inner.deserializer();
}
}
你想要这个吗?
KStream<String, TimeOff> source = builder.stream(sourceTopic);
KTable<String, List<TimeOff>> table = source.groupBy((k, v) -> v.getId())
.aggregate(ArrayList::new,
(key, value, aggregate) -> {
aggregate.add(value);
return aggregate;
}, Materialized.as("NewStore"));