Kafka 窗口化流使优雅和抑制键感知

Kafka windowed stream make grace and suppress key aware

我目前有一个简单的数据流,例如:

|-----|--------|-------|
| Key | TS(ms) | Value |
|-----|--------|-------|
|  A  |   1000 |   0   |
|  A  |   1000 |   0   |
|  A  |  61000 |   0   |
|  A  |  61000 |   0   |
|  A  | 121000 |   0   |
|  A  | 121000 |   0   |
|  A  | 181000 |  10   |
|  A  | 181000 |  10   |
|  A  | 241000 |  10   |
|  A  | 241000 |  10   |
|  B  |   1000 |   0   |
|  B  |   1000 |   0   |
|  B  |  61000 |   0   |
|  B  |  61000 |   0   |
|  B  | 121000 |   0   |
|  B  | 121000 |   0   |
|  B  | 181000 |  10   |
|  B  | 181000 |  10   |
|  B  |   1000 |  10   |
|  B  | 241000 |  10   |
|  B  | 241000 |  10   |
|-----|--------|-------|

这也是我在主题中发布数据的顺序,值不是真正的整数而是 avro 值,但键是字符串。

我的代码是这样的:

KStream<Windowed<String>, Long> aggregatedStream = inputStream
   .groupByKey()
   .windowedBy(TimeWindows.of(Duration.ofMinutes(1)).grace(Duration.ZERO))
   .count()
   .toStream();

aggregatedStream.print(Printed.toSysOut());

print 的输出是:

[KTABLE-TOSTREAM-0000000003]: [A@0/60000], 1
[KTABLE-TOSTREAM-0000000003]: [A@0/60000], 2
[KTABLE-TOSTREAM-0000000003]: [A@60000/120000], 1
[KTABLE-TOSTREAM-0000000003]: [A@60000/120000], 2
[KTABLE-TOSTREAM-0000000003]: [A@120000/180000], 1
[KTABLE-TOSTREAM-0000000003]: [A@120000/180000], 2
[KTABLE-TOSTREAM-0000000003]: [A@180000/240000], 1
[KTABLE-TOSTREAM-0000000003]: [A@180000/240000], 2
[KTABLE-TOSTREAM-0000000003]: [A@240000/300000], 1
[KTABLE-TOSTREAM-0000000003]: [A@240000/300000], 2
[KTABLE-TOSTREAM-0000000003]: [B@240000/300000], 1
[KTABLE-TOSTREAM-0000000003]: [B@240000/300000], 2

似乎宽限期独立于流的密钥在全球范围内适用,我期望的是(如果可能的话)接收密钥 A 的所有 10 window 计数和 10 window 键 B 的计数。 在某种程度上,宽限期仅根据流的密钥关闭 windows。 这可能吗?

gracesuppress 似乎对每个分区使用了一个全局时间戳,因此不可能每个键都有不同的时间戳。

有可能禁用宽限期并使用自定义转换器而不是常规转换器 suppress 来实现按键抑制。

例如,这是我们代码的一部分:

KStream<String, ...> aggregatedStream = pairsStream
    .groupByKey()
    .windowedBy(TimeWindows.of(Duration.ofMinutes(1)))
    .aggregate(...your aggregation logic...)
    .toStream()
    .flatTransform(new TransformerSupplier<Windowed<String>, AggregateOutput, Iterable<KeyValue<String, SuppressedOutput>>>() {
        @Override
        public Transformer<Windowed<String>, AggregateOutput, Iterable<KeyValue<String, SuppressedOutput>>> get() {
            return new Transformer<Windowed<String>, AggregateOutput, Iterable<KeyValue<String, SuppressedOutput>>>() {
                KeyValueStore<String, SuppressedOutput> store;

                @SuppressWarnings("unchecked")
                @Override
                public void init(ProcessorContext context) {
                    store = (KeyValueStore<String, SuppressedOutput>) context.getStateStore("suppress-store");
                }

                @Override
                public Iterable<KeyValue<String, SuppressedOutput>> transform(Windowed<String> window, AggregateOutput sequenceList) {
                    String messageKey = window.key();
                    long windowEndTimestamp = window.window().endTime().toEpochMilli();
                    SuppressedOutput currentSuppressedOutput = new SuppressedOutput(windowEndTimestamp, sequenceList);
                    SuppressedOutput storeValue = store.get(messageKey);
                    if (storeValue == null) {
                        // First time we receive a window for that key
                    }

                    if (windowEndTimestamp > storeValue.getTimestamp()) {
                        // Received a new window
                    }

                    if (windowEndTimestamp < storeValue.getTimestamp()) {
                        // Window older than the last window we've received
                    }

                    store.put(messageKey, currentSuppressedOutput);
                    return new ArrayList<>();
                }

                @Override
                public void close() {
                }
            };
        }
    }, "suppress-store")