Kafka 窗口化流使优雅和抑制键感知
Kafka windowed stream make grace and suppress key aware
我目前有一个简单的数据流,例如:
|-----|--------|-------|
| Key | TS(ms) | Value |
|-----|--------|-------|
| A | 1000 | 0 |
| A | 1000 | 0 |
| A | 61000 | 0 |
| A | 61000 | 0 |
| A | 121000 | 0 |
| A | 121000 | 0 |
| A | 181000 | 10 |
| A | 181000 | 10 |
| A | 241000 | 10 |
| A | 241000 | 10 |
| B | 1000 | 0 |
| B | 1000 | 0 |
| B | 61000 | 0 |
| B | 61000 | 0 |
| B | 121000 | 0 |
| B | 121000 | 0 |
| B | 181000 | 10 |
| B | 181000 | 10 |
| B | 1000 | 10 |
| B | 241000 | 10 |
| B | 241000 | 10 |
|-----|--------|-------|
这也是我在主题中发布数据的顺序,值不是真正的整数而是 avro 值,但键是字符串。
我的代码是这样的:
KStream<Windowed<String>, Long> aggregatedStream = inputStream
.groupByKey()
.windowedBy(TimeWindows.of(Duration.ofMinutes(1)).grace(Duration.ZERO))
.count()
.toStream();
aggregatedStream.print(Printed.toSysOut());
print
的输出是:
[KTABLE-TOSTREAM-0000000003]: [A@0/60000], 1
[KTABLE-TOSTREAM-0000000003]: [A@0/60000], 2
[KTABLE-TOSTREAM-0000000003]: [A@60000/120000], 1
[KTABLE-TOSTREAM-0000000003]: [A@60000/120000], 2
[KTABLE-TOSTREAM-0000000003]: [A@120000/180000], 1
[KTABLE-TOSTREAM-0000000003]: [A@120000/180000], 2
[KTABLE-TOSTREAM-0000000003]: [A@180000/240000], 1
[KTABLE-TOSTREAM-0000000003]: [A@180000/240000], 2
[KTABLE-TOSTREAM-0000000003]: [A@240000/300000], 1
[KTABLE-TOSTREAM-0000000003]: [A@240000/300000], 2
[KTABLE-TOSTREAM-0000000003]: [B@240000/300000], 1
[KTABLE-TOSTREAM-0000000003]: [B@240000/300000], 2
似乎宽限期独立于流的密钥在全球范围内适用,我期望的是(如果可能的话)接收密钥 A 的所有 10 window 计数和 10 window 键 B 的计数。
在某种程度上,宽限期仅根据流的密钥关闭 windows。
这可能吗?
grace
和 suppress
似乎对每个分区使用了一个全局时间戳,因此不可能每个键都有不同的时间戳。
有可能禁用宽限期并使用自定义转换器而不是常规转换器 suppress
来实现按键抑制。
例如,这是我们代码的一部分:
KStream<String, ...> aggregatedStream = pairsStream
.groupByKey()
.windowedBy(TimeWindows.of(Duration.ofMinutes(1)))
.aggregate(...your aggregation logic...)
.toStream()
.flatTransform(new TransformerSupplier<Windowed<String>, AggregateOutput, Iterable<KeyValue<String, SuppressedOutput>>>() {
@Override
public Transformer<Windowed<String>, AggregateOutput, Iterable<KeyValue<String, SuppressedOutput>>> get() {
return new Transformer<Windowed<String>, AggregateOutput, Iterable<KeyValue<String, SuppressedOutput>>>() {
KeyValueStore<String, SuppressedOutput> store;
@SuppressWarnings("unchecked")
@Override
public void init(ProcessorContext context) {
store = (KeyValueStore<String, SuppressedOutput>) context.getStateStore("suppress-store");
}
@Override
public Iterable<KeyValue<String, SuppressedOutput>> transform(Windowed<String> window, AggregateOutput sequenceList) {
String messageKey = window.key();
long windowEndTimestamp = window.window().endTime().toEpochMilli();
SuppressedOutput currentSuppressedOutput = new SuppressedOutput(windowEndTimestamp, sequenceList);
SuppressedOutput storeValue = store.get(messageKey);
if (storeValue == null) {
// First time we receive a window for that key
}
if (windowEndTimestamp > storeValue.getTimestamp()) {
// Received a new window
}
if (windowEndTimestamp < storeValue.getTimestamp()) {
// Window older than the last window we've received
}
store.put(messageKey, currentSuppressedOutput);
return new ArrayList<>();
}
@Override
public void close() {
}
};
}
}, "suppress-store")
我目前有一个简单的数据流,例如:
|-----|--------|-------|
| Key | TS(ms) | Value |
|-----|--------|-------|
| A | 1000 | 0 |
| A | 1000 | 0 |
| A | 61000 | 0 |
| A | 61000 | 0 |
| A | 121000 | 0 |
| A | 121000 | 0 |
| A | 181000 | 10 |
| A | 181000 | 10 |
| A | 241000 | 10 |
| A | 241000 | 10 |
| B | 1000 | 0 |
| B | 1000 | 0 |
| B | 61000 | 0 |
| B | 61000 | 0 |
| B | 121000 | 0 |
| B | 121000 | 0 |
| B | 181000 | 10 |
| B | 181000 | 10 |
| B | 1000 | 10 |
| B | 241000 | 10 |
| B | 241000 | 10 |
|-----|--------|-------|
这也是我在主题中发布数据的顺序,值不是真正的整数而是 avro 值,但键是字符串。
我的代码是这样的:
KStream<Windowed<String>, Long> aggregatedStream = inputStream
.groupByKey()
.windowedBy(TimeWindows.of(Duration.ofMinutes(1)).grace(Duration.ZERO))
.count()
.toStream();
aggregatedStream.print(Printed.toSysOut());
print
的输出是:
[KTABLE-TOSTREAM-0000000003]: [A@0/60000], 1
[KTABLE-TOSTREAM-0000000003]: [A@0/60000], 2
[KTABLE-TOSTREAM-0000000003]: [A@60000/120000], 1
[KTABLE-TOSTREAM-0000000003]: [A@60000/120000], 2
[KTABLE-TOSTREAM-0000000003]: [A@120000/180000], 1
[KTABLE-TOSTREAM-0000000003]: [A@120000/180000], 2
[KTABLE-TOSTREAM-0000000003]: [A@180000/240000], 1
[KTABLE-TOSTREAM-0000000003]: [A@180000/240000], 2
[KTABLE-TOSTREAM-0000000003]: [A@240000/300000], 1
[KTABLE-TOSTREAM-0000000003]: [A@240000/300000], 2
[KTABLE-TOSTREAM-0000000003]: [B@240000/300000], 1
[KTABLE-TOSTREAM-0000000003]: [B@240000/300000], 2
似乎宽限期独立于流的密钥在全球范围内适用,我期望的是(如果可能的话)接收密钥 A 的所有 10 window 计数和 10 window 键 B 的计数。 在某种程度上,宽限期仅根据流的密钥关闭 windows。 这可能吗?
grace
和 suppress
似乎对每个分区使用了一个全局时间戳,因此不可能每个键都有不同的时间戳。
有可能禁用宽限期并使用自定义转换器而不是常规转换器 suppress
来实现按键抑制。
例如,这是我们代码的一部分:
KStream<String, ...> aggregatedStream = pairsStream
.groupByKey()
.windowedBy(TimeWindows.of(Duration.ofMinutes(1)))
.aggregate(...your aggregation logic...)
.toStream()
.flatTransform(new TransformerSupplier<Windowed<String>, AggregateOutput, Iterable<KeyValue<String, SuppressedOutput>>>() {
@Override
public Transformer<Windowed<String>, AggregateOutput, Iterable<KeyValue<String, SuppressedOutput>>> get() {
return new Transformer<Windowed<String>, AggregateOutput, Iterable<KeyValue<String, SuppressedOutput>>>() {
KeyValueStore<String, SuppressedOutput> store;
@SuppressWarnings("unchecked")
@Override
public void init(ProcessorContext context) {
store = (KeyValueStore<String, SuppressedOutput>) context.getStateStore("suppress-store");
}
@Override
public Iterable<KeyValue<String, SuppressedOutput>> transform(Windowed<String> window, AggregateOutput sequenceList) {
String messageKey = window.key();
long windowEndTimestamp = window.window().endTime().toEpochMilli();
SuppressedOutput currentSuppressedOutput = new SuppressedOutput(windowEndTimestamp, sequenceList);
SuppressedOutput storeValue = store.get(messageKey);
if (storeValue == null) {
// First time we receive a window for that key
}
if (windowEndTimestamp > storeValue.getTimestamp()) {
// Received a new window
}
if (windowEndTimestamp < storeValue.getTimestamp()) {
// Window older than the last window we've received
}
store.put(messageKey, currentSuppressedOutput);
return new ArrayList<>();
}
@Override
public void close() {
}
};
}
}, "suppress-store")