复杂聚合

Complex aggregation

我在一个主题中有数据需要在多个级别进行统计,所有代码和文章都只提到字数统计示例。

数据示例为:

序列号:123 国家:我们 日期:01/05/2018 州:纽约 城市:纽约 访问者:5

序列号:123 国家:我们 日期:01/06/2018 州:纽约 城市:皇后区 访客:10

序列号:456 日期:01/06/2018 国家:我们 州:纽约 城市:皇后区 访客:27

序列号:123 日期:01/06/2018 国家:我们 州:纽约 城市:纽约 访客:867

我已经完成了过滤,groupBy 但聚合? 对不起 Java 8 和 & 混合,我更喜欢 8 但同时学习它

KTable<String, CountryVisitorModel> countryStream1 = inStream
    .filter((key, value) -> value.status.equalsIgnoreCase("TEST_DATA"))
    .groupBy((key, value) -> value.serial)
    .aggregate(
            new Initializer<CountryVisitorModel>() {

            public CountryVisitorModelapply() {
                return new CountryVisitorModel();
            }
        },
        new Aggregator<String, InputModel, CountryVisitorModel>() {

            @Override
            public CountryVisitorModelapply(String key, InputModel value, CountryVisitorModel aggregate) {

    aggregate.serial = value.serial;
    aggregate.country_name = value.country_name;
    aggregate.city_name = value.city_name;

    aggregate.country_count++;
    aggregate.city_count++;
    aggregate.ip_count++;

        //
    return aggregate;
       }
},
Materialized.with(stringSerde, visitorSerde));

对于所有相等的 serial_id(这将是分组依据) 统计访客总数:

连续国家城市total_num_visitors

如果每条记录只贡献一个计数,我会建议 branch() 流和计数 sub-stream:

KStream stream = builder.stream(...)
KStream[] subStreams = stream.branch(...);

// each record of `stream` will be contained in exactly _one_ `substream`
subStream[0].grouByKey().count(); // or aggregate() instead of count()
subStream[1].grouByKey().count();
// ...

如果分支不起作用,因为单个记录需要进入多个计数,您可以"broadcast"并过滤:

KStream stream = builder.stream(...)

// each record in `stream` will be "duplicated" and sent to all `filters`
stream.filter(...).grouByKey().count(); // or aggregate() instead of count()
stream.filter(...).grouByKey().count();
// ...

多次使用同一个 KStream 对象并应用多个运算符(在我们的例子中 filter(),每条记录将 "broadcasted" 用于所有运算符)。请注意,对于这种情况,记录(即对象)并未进行物理复制,但相同的输入记录对象用于调用每个 filter().

您可以将字段值保存在一个集合中,并使用 Set#size 方法获取每个键的计数。

import com.google.gson.Gson;
import org.apache.kafka.common.serialization.Deserializer;
import org.apache.kafka.common.serialization.Serde;
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.common.serialization.Serializer;
import org.apache.kafka.common.utils.Bytes;
import org.apache.kafka.streams.*;
import org.apache.kafka.streams.kstream.Consumed;
import org.apache.kafka.streams.kstream.Grouped;
import org.apache.kafka.streams.kstream.Materialized;
import org.apache.kafka.streams.state.KeyValueStore;
import org.junit.jupiter.api.Test;

import java.nio.charset.Charset;
import java.nio.charset.StandardCharsets;
import java.util.HashSet;
import java.util.Properties;
import java.util.Set;

import static org.assertj.core.api.Assertions.assertThat;

public class SomeTest {

    public static class VisitorDetails {
        public long serial;
        public String country;
        public long date;
        public String state;
        public String city;
        public long visitors;
    }

    public static class Aggregate {
        public Set <String> countrySet = new HashSet <>();
        public long countryCounter;

        public Set <String> citySet = new HashSet <>();
        public long cityCounter;

        public long totalVisitorCounter = 0;
    }

    public static class CustomSerializer<T> implements Serializer <T> {
        private static final Charset CHARSET = StandardCharsets.UTF_8;
        static private final Gson gson = new Gson();


        @Override
        public byte[] serialize(String topic, T data) {
            String line = gson.toJson(data);
            return line.getBytes(CHARSET);
        }
    }

    public static class CustomDeserializer<T> implements Deserializer <T> {
        private static final Charset CHARSET = StandardCharsets.UTF_8;
        static private final Gson gson = new Gson();

        private final Class <T> tClass;

        public CustomDeserializer(Class <T> tClass) {
            this.tClass = tClass;
        }

        @Override
        public T deserialize(String topic, byte[] data) {
            try {
                String person = new String(data, CHARSET);
                return gson.fromJson(person, tClass);
            } catch (Exception e) {
                throw new IllegalArgumentException("Deserialization failed:", e);
            }
        }
    }

    public static class AggregateSerde implements Serde <Aggregate> {

        @Override
        public Serializer <Aggregate> serializer() {
            return new CustomSerializer <Aggregate>();
        }

        @Override
        public Deserializer <Aggregate> deserializer() {
            return new CustomDeserializer <Aggregate>(Aggregate.class);
        }
    }

    public static class VisitorDetailsSerde implements Serde <VisitorDetails> {

        @Override
        public Serializer <VisitorDetails> serializer() {
            return new CustomSerializer <VisitorDetails>();
        }

        @Override
        public Deserializer <VisitorDetails> deserializer() {
            return new CustomDeserializer <VisitorDetails>(VisitorDetails.class);
        }
    }

    @Test
    void test() {

        StreamsBuilder builder = new StreamsBuilder();
        builder.stream("input", Consumed.with(Serdes.Long(), new VisitorDetailsSerde()))
                .groupByKey(Grouped.with(Serdes.Long(), new VisitorDetailsSerde()))
                .aggregate(
                        Aggregate::new,
                        (key, value, agg) -> {

                            agg.countrySet.add(value.country);
                            agg.countryCounter = agg.countrySet.size();

                            agg.citySet.add(value.city);
                            agg.cityCounter = agg.citySet.size();

                            agg.totalVisitorCounter += value.visitors;
                            return agg;

                        },
                        Materialized. <Long, Aggregate, KeyValueStore <Bytes, byte[]>>as("store-name-2")
                                .withKeySerde(Serdes.Long())
                                .withValueSerde(new AggregateSerde())
                                .withLoggingDisabled() // only for testing,
                                // recommended to not disable on prod as it provides fault tolerance

                );

        Topology topology = builder.build();
        
        Properties properties = new Properties();
        properties.put(StreamsConfig.APPLICATION_ID_CONFIG, "test");
        properties.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "dummy:1234");

        TopologyTestDriver testDriver = new TopologyTestDriver(topology, properties);

        TestInputTopic <Long, VisitorDetails> inputTopic = testDriver.createInputTopic("input",
                Serdes.Long().serializer(), new CustomSerializer <VisitorDetails>());

        inputTopic.pipeInput(123L, visitorDetail(123L, "usa", "ny", 10L));
        inputTopic.pipeInput(123L, visitorDetail(123L, "usa", "la", 20L));
        inputTopic.pipeInput(123L, visitorDetail(123L, "pl", "krk", 30L));
        inputTopic.pipeInput(123L, visitorDetail(123L, "pl", "wrs", 40L));
        inputTopic.pipeInput(123L, visitorDetail(123L, "pl", "krk", 50L));

        testDriver.getAllStateStores();
        KeyValueStore <Long, Aggregate> keyValueStore = testDriver. <Long, Aggregate>getKeyValueStore("store-name-2");

        assertThat(keyValueStore.get(123L).cityCounter).isEqualTo(4);
        assertThat(keyValueStore.get(123L).countryCounter).isEqualTo(2);
        assertThat(keyValueStore.get(123L).totalVisitorCounter).isEqualTo(150);

        testDriver.close();

    }

    private VisitorDetails visitorDetail(long serial, String country, String city, long visitors) {
        VisitorDetails visitorDetails = new VisitorDetails();
        visitorDetails.serial = serial;
        visitorDetails.country = country;
        visitorDetails.city = city;
        visitorDetails.visitors = visitors;
        return visitorDetails;
    }
}