复杂聚合
Complex aggregation
我在一个主题中有数据需要在多个级别进行统计,所有代码和文章都只提到字数统计示例。
数据示例为:
序列号:123
国家:我们
日期:01/05/2018
州:纽约
城市:纽约
访问者:5
序列号:123
国家:我们
日期:01/06/2018
州:纽约
城市:皇后区
访客:10
序列号:456
日期:01/06/2018
国家:我们
州:纽约
城市:皇后区
访客:27
序列号:123
日期:01/06/2018
国家:我们
州:纽约
城市:纽约
访客:867
我已经完成了过滤,groupBy 但聚合?
对不起 Java 8 和 & 混合,我更喜欢 8 但同时学习它
KTable<String, CountryVisitorModel> countryStream1 = inStream
.filter((key, value) -> value.status.equalsIgnoreCase("TEST_DATA"))
.groupBy((key, value) -> value.serial)
.aggregate(
new Initializer<CountryVisitorModel>() {
public CountryVisitorModelapply() {
return new CountryVisitorModel();
}
},
new Aggregator<String, InputModel, CountryVisitorModel>() {
@Override
public CountryVisitorModelapply(String key, InputModel value, CountryVisitorModel aggregate) {
aggregate.serial = value.serial;
aggregate.country_name = value.country_name;
aggregate.city_name = value.city_name;
aggregate.country_count++;
aggregate.city_count++;
aggregate.ip_count++;
//
return aggregate;
}
},
Materialized.with(stringSerde, visitorSerde));
对于所有相等的 serial_id(这将是分组依据)
统计访客总数:
连续国家城市total_num_visitors
如果每条记录只贡献一个计数,我会建议 branch()
流和计数 sub-stream:
KStream stream = builder.stream(...)
KStream[] subStreams = stream.branch(...);
// each record of `stream` will be contained in exactly _one_ `substream`
subStream[0].grouByKey().count(); // or aggregate() instead of count()
subStream[1].grouByKey().count();
// ...
如果分支不起作用,因为单个记录需要进入多个计数,您可以"broadcast"并过滤:
KStream stream = builder.stream(...)
// each record in `stream` will be "duplicated" and sent to all `filters`
stream.filter(...).grouByKey().count(); // or aggregate() instead of count()
stream.filter(...).grouByKey().count();
// ...
多次使用同一个 KStream
对象并应用多个运算符(在我们的例子中 filter()
,每条记录将 "broadcasted" 用于所有运算符)。请注意,对于这种情况,记录(即对象)并未进行物理复制,但相同的输入记录对象用于调用每个 filter()
.
您可以将字段值保存在一个集合中,并使用 Set#size 方法获取每个键的计数。
import com.google.gson.Gson;
import org.apache.kafka.common.serialization.Deserializer;
import org.apache.kafka.common.serialization.Serde;
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.common.serialization.Serializer;
import org.apache.kafka.common.utils.Bytes;
import org.apache.kafka.streams.*;
import org.apache.kafka.streams.kstream.Consumed;
import org.apache.kafka.streams.kstream.Grouped;
import org.apache.kafka.streams.kstream.Materialized;
import org.apache.kafka.streams.state.KeyValueStore;
import org.junit.jupiter.api.Test;
import java.nio.charset.Charset;
import java.nio.charset.StandardCharsets;
import java.util.HashSet;
import java.util.Properties;
import java.util.Set;
import static org.assertj.core.api.Assertions.assertThat;
public class SomeTest {
public static class VisitorDetails {
public long serial;
public String country;
public long date;
public String state;
public String city;
public long visitors;
}
public static class Aggregate {
public Set <String> countrySet = new HashSet <>();
public long countryCounter;
public Set <String> citySet = new HashSet <>();
public long cityCounter;
public long totalVisitorCounter = 0;
}
public static class CustomSerializer<T> implements Serializer <T> {
private static final Charset CHARSET = StandardCharsets.UTF_8;
static private final Gson gson = new Gson();
@Override
public byte[] serialize(String topic, T data) {
String line = gson.toJson(data);
return line.getBytes(CHARSET);
}
}
public static class CustomDeserializer<T> implements Deserializer <T> {
private static final Charset CHARSET = StandardCharsets.UTF_8;
static private final Gson gson = new Gson();
private final Class <T> tClass;
public CustomDeserializer(Class <T> tClass) {
this.tClass = tClass;
}
@Override
public T deserialize(String topic, byte[] data) {
try {
String person = new String(data, CHARSET);
return gson.fromJson(person, tClass);
} catch (Exception e) {
throw new IllegalArgumentException("Deserialization failed:", e);
}
}
}
public static class AggregateSerde implements Serde <Aggregate> {
@Override
public Serializer <Aggregate> serializer() {
return new CustomSerializer <Aggregate>();
}
@Override
public Deserializer <Aggregate> deserializer() {
return new CustomDeserializer <Aggregate>(Aggregate.class);
}
}
public static class VisitorDetailsSerde implements Serde <VisitorDetails> {
@Override
public Serializer <VisitorDetails> serializer() {
return new CustomSerializer <VisitorDetails>();
}
@Override
public Deserializer <VisitorDetails> deserializer() {
return new CustomDeserializer <VisitorDetails>(VisitorDetails.class);
}
}
@Test
void test() {
StreamsBuilder builder = new StreamsBuilder();
builder.stream("input", Consumed.with(Serdes.Long(), new VisitorDetailsSerde()))
.groupByKey(Grouped.with(Serdes.Long(), new VisitorDetailsSerde()))
.aggregate(
Aggregate::new,
(key, value, agg) -> {
agg.countrySet.add(value.country);
agg.countryCounter = agg.countrySet.size();
agg.citySet.add(value.city);
agg.cityCounter = agg.citySet.size();
agg.totalVisitorCounter += value.visitors;
return agg;
},
Materialized. <Long, Aggregate, KeyValueStore <Bytes, byte[]>>as("store-name-2")
.withKeySerde(Serdes.Long())
.withValueSerde(new AggregateSerde())
.withLoggingDisabled() // only for testing,
// recommended to not disable on prod as it provides fault tolerance
);
Topology topology = builder.build();
Properties properties = new Properties();
properties.put(StreamsConfig.APPLICATION_ID_CONFIG, "test");
properties.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "dummy:1234");
TopologyTestDriver testDriver = new TopologyTestDriver(topology, properties);
TestInputTopic <Long, VisitorDetails> inputTopic = testDriver.createInputTopic("input",
Serdes.Long().serializer(), new CustomSerializer <VisitorDetails>());
inputTopic.pipeInput(123L, visitorDetail(123L, "usa", "ny", 10L));
inputTopic.pipeInput(123L, visitorDetail(123L, "usa", "la", 20L));
inputTopic.pipeInput(123L, visitorDetail(123L, "pl", "krk", 30L));
inputTopic.pipeInput(123L, visitorDetail(123L, "pl", "wrs", 40L));
inputTopic.pipeInput(123L, visitorDetail(123L, "pl", "krk", 50L));
testDriver.getAllStateStores();
KeyValueStore <Long, Aggregate> keyValueStore = testDriver. <Long, Aggregate>getKeyValueStore("store-name-2");
assertThat(keyValueStore.get(123L).cityCounter).isEqualTo(4);
assertThat(keyValueStore.get(123L).countryCounter).isEqualTo(2);
assertThat(keyValueStore.get(123L).totalVisitorCounter).isEqualTo(150);
testDriver.close();
}
private VisitorDetails visitorDetail(long serial, String country, String city, long visitors) {
VisitorDetails visitorDetails = new VisitorDetails();
visitorDetails.serial = serial;
visitorDetails.country = country;
visitorDetails.city = city;
visitorDetails.visitors = visitors;
return visitorDetails;
}
}
我在一个主题中有数据需要在多个级别进行统计,所有代码和文章都只提到字数统计示例。
数据示例为:
序列号:123 国家:我们 日期:01/05/2018 州:纽约 城市:纽约 访问者:5
序列号:123 国家:我们 日期:01/06/2018 州:纽约 城市:皇后区 访客:10
序列号:456 日期:01/06/2018 国家:我们 州:纽约 城市:皇后区 访客:27
序列号:123 日期:01/06/2018 国家:我们 州:纽约 城市:纽约 访客:867
我已经完成了过滤,groupBy 但聚合? 对不起 Java 8 和 & 混合,我更喜欢 8 但同时学习它
KTable<String, CountryVisitorModel> countryStream1 = inStream
.filter((key, value) -> value.status.equalsIgnoreCase("TEST_DATA"))
.groupBy((key, value) -> value.serial)
.aggregate(
new Initializer<CountryVisitorModel>() {
public CountryVisitorModelapply() {
return new CountryVisitorModel();
}
},
new Aggregator<String, InputModel, CountryVisitorModel>() {
@Override
public CountryVisitorModelapply(String key, InputModel value, CountryVisitorModel aggregate) {
aggregate.serial = value.serial;
aggregate.country_name = value.country_name;
aggregate.city_name = value.city_name;
aggregate.country_count++;
aggregate.city_count++;
aggregate.ip_count++;
//
return aggregate;
}
},
Materialized.with(stringSerde, visitorSerde));
对于所有相等的 serial_id(这将是分组依据) 统计访客总数:
连续国家城市total_num_visitors
如果每条记录只贡献一个计数,我会建议 branch()
流和计数 sub-stream:
KStream stream = builder.stream(...)
KStream[] subStreams = stream.branch(...);
// each record of `stream` will be contained in exactly _one_ `substream`
subStream[0].grouByKey().count(); // or aggregate() instead of count()
subStream[1].grouByKey().count();
// ...
如果分支不起作用,因为单个记录需要进入多个计数,您可以"broadcast"并过滤:
KStream stream = builder.stream(...)
// each record in `stream` will be "duplicated" and sent to all `filters`
stream.filter(...).grouByKey().count(); // or aggregate() instead of count()
stream.filter(...).grouByKey().count();
// ...
多次使用同一个 KStream
对象并应用多个运算符(在我们的例子中 filter()
,每条记录将 "broadcasted" 用于所有运算符)。请注意,对于这种情况,记录(即对象)并未进行物理复制,但相同的输入记录对象用于调用每个 filter()
.
您可以将字段值保存在一个集合中,并使用 Set#size 方法获取每个键的计数。
import com.google.gson.Gson;
import org.apache.kafka.common.serialization.Deserializer;
import org.apache.kafka.common.serialization.Serde;
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.common.serialization.Serializer;
import org.apache.kafka.common.utils.Bytes;
import org.apache.kafka.streams.*;
import org.apache.kafka.streams.kstream.Consumed;
import org.apache.kafka.streams.kstream.Grouped;
import org.apache.kafka.streams.kstream.Materialized;
import org.apache.kafka.streams.state.KeyValueStore;
import org.junit.jupiter.api.Test;
import java.nio.charset.Charset;
import java.nio.charset.StandardCharsets;
import java.util.HashSet;
import java.util.Properties;
import java.util.Set;
import static org.assertj.core.api.Assertions.assertThat;
public class SomeTest {
public static class VisitorDetails {
public long serial;
public String country;
public long date;
public String state;
public String city;
public long visitors;
}
public static class Aggregate {
public Set <String> countrySet = new HashSet <>();
public long countryCounter;
public Set <String> citySet = new HashSet <>();
public long cityCounter;
public long totalVisitorCounter = 0;
}
public static class CustomSerializer<T> implements Serializer <T> {
private static final Charset CHARSET = StandardCharsets.UTF_8;
static private final Gson gson = new Gson();
@Override
public byte[] serialize(String topic, T data) {
String line = gson.toJson(data);
return line.getBytes(CHARSET);
}
}
public static class CustomDeserializer<T> implements Deserializer <T> {
private static final Charset CHARSET = StandardCharsets.UTF_8;
static private final Gson gson = new Gson();
private final Class <T> tClass;
public CustomDeserializer(Class <T> tClass) {
this.tClass = tClass;
}
@Override
public T deserialize(String topic, byte[] data) {
try {
String person = new String(data, CHARSET);
return gson.fromJson(person, tClass);
} catch (Exception e) {
throw new IllegalArgumentException("Deserialization failed:", e);
}
}
}
public static class AggregateSerde implements Serde <Aggregate> {
@Override
public Serializer <Aggregate> serializer() {
return new CustomSerializer <Aggregate>();
}
@Override
public Deserializer <Aggregate> deserializer() {
return new CustomDeserializer <Aggregate>(Aggregate.class);
}
}
public static class VisitorDetailsSerde implements Serde <VisitorDetails> {
@Override
public Serializer <VisitorDetails> serializer() {
return new CustomSerializer <VisitorDetails>();
}
@Override
public Deserializer <VisitorDetails> deserializer() {
return new CustomDeserializer <VisitorDetails>(VisitorDetails.class);
}
}
@Test
void test() {
StreamsBuilder builder = new StreamsBuilder();
builder.stream("input", Consumed.with(Serdes.Long(), new VisitorDetailsSerde()))
.groupByKey(Grouped.with(Serdes.Long(), new VisitorDetailsSerde()))
.aggregate(
Aggregate::new,
(key, value, agg) -> {
agg.countrySet.add(value.country);
agg.countryCounter = agg.countrySet.size();
agg.citySet.add(value.city);
agg.cityCounter = agg.citySet.size();
agg.totalVisitorCounter += value.visitors;
return agg;
},
Materialized. <Long, Aggregate, KeyValueStore <Bytes, byte[]>>as("store-name-2")
.withKeySerde(Serdes.Long())
.withValueSerde(new AggregateSerde())
.withLoggingDisabled() // only for testing,
// recommended to not disable on prod as it provides fault tolerance
);
Topology topology = builder.build();
Properties properties = new Properties();
properties.put(StreamsConfig.APPLICATION_ID_CONFIG, "test");
properties.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "dummy:1234");
TopologyTestDriver testDriver = new TopologyTestDriver(topology, properties);
TestInputTopic <Long, VisitorDetails> inputTopic = testDriver.createInputTopic("input",
Serdes.Long().serializer(), new CustomSerializer <VisitorDetails>());
inputTopic.pipeInput(123L, visitorDetail(123L, "usa", "ny", 10L));
inputTopic.pipeInput(123L, visitorDetail(123L, "usa", "la", 20L));
inputTopic.pipeInput(123L, visitorDetail(123L, "pl", "krk", 30L));
inputTopic.pipeInput(123L, visitorDetail(123L, "pl", "wrs", 40L));
inputTopic.pipeInput(123L, visitorDetail(123L, "pl", "krk", 50L));
testDriver.getAllStateStores();
KeyValueStore <Long, Aggregate> keyValueStore = testDriver. <Long, Aggregate>getKeyValueStore("store-name-2");
assertThat(keyValueStore.get(123L).cityCounter).isEqualTo(4);
assertThat(keyValueStore.get(123L).countryCounter).isEqualTo(2);
assertThat(keyValueStore.get(123L).totalVisitorCounter).isEqualTo(150);
testDriver.close();
}
private VisitorDetails visitorDetail(long serial, String country, String city, long visitors) {
VisitorDetails visitorDetails = new VisitorDetails();
visitorDetails.serial = serial;
visitorDetails.country = country;
visitorDetails.city = city;
visitorDetails.visitors = visitors;
return visitorDetails;
}
}