事件计数的窗口聚合
A windowed aggregation on event count
我已经对我的kafka事件进行了分组:
private static void createImportStream(final StreamsBuilder builder, final Collection<String> topics) {
final KStream<byte[], GraphEvent> stream = builder.stream(topics, Consumed.with(Serdes.ByteArray(), new UserEventThriftSerde()));
stream.filter((key, request) -> {
return Objects.nonNull(request);
}).groupBy(
(key, value) -> Integer.valueOf(value.getSourceType()),
Grouped.with(Serdes.Integer(), new UserEventThriftSerde()))
.aggregate(ArrayList::new, (key, value, aggregatedValue) -> {
aggregatedValue.add(value);
return aggregatedValue;
},
Materialized.with(Serdes.Integer(), new ArrayListSerde<UserEvent>(new UserEventThriftSerde()))
).toStream();
}
如何添加 window
但不是基于时间,而是基于事件的数量。
原因是事件将是批量转储,时间窗口聚合不适合,因为所有事件都可能出现在相同的几秒钟内。
Kafka Streams 不支持开箱即用的基于计数的windows,因为它们是不确定的,并且很难处理乱序数据。
您可以使用处理器 API 为您的用例构建自定义运算符,而不是使用 DSL。
我已经对我的kafka事件进行了分组:
private static void createImportStream(final StreamsBuilder builder, final Collection<String> topics) {
final KStream<byte[], GraphEvent> stream = builder.stream(topics, Consumed.with(Serdes.ByteArray(), new UserEventThriftSerde()));
stream.filter((key, request) -> {
return Objects.nonNull(request);
}).groupBy(
(key, value) -> Integer.valueOf(value.getSourceType()),
Grouped.with(Serdes.Integer(), new UserEventThriftSerde()))
.aggregate(ArrayList::new, (key, value, aggregatedValue) -> {
aggregatedValue.add(value);
return aggregatedValue;
},
Materialized.with(Serdes.Integer(), new ArrayListSerde<UserEvent>(new UserEventThriftSerde()))
).toStream();
}
如何添加 window
但不是基于时间,而是基于事件的数量。
原因是事件将是批量转储,时间窗口聚合不适合,因为所有事件都可能出现在相同的几秒钟内。
Kafka Streams 不支持开箱即用的基于计数的windows,因为它们是不确定的,并且很难处理乱序数据。
您可以使用处理器 API 为您的用例构建自定义运算符,而不是使用 DSL。