Kafka Stream 修复 window 不按键分组
Kafka Stream fixed window not grouping by key
我得到了一个 Kafka Stream。如何在特定时间累积消息 window 而不管密钥?
我的用例是在不考虑密钥的情况下每 10 分钟从流中写入一个文件。
您需要使用 Transformer
with a state store and schedule a punctuation call 每 10 分钟浏览一次商店并发出记录。当您在状态存储中收集记录时,转换器应该 return null
,因此您还需要在转换器之后使用过滤器来忽略任何 null
记录。
这是我认为接近您要求的内容的简单示例。让我知道进展如何。
class WindowedTransformerExample {
public static void main(String[] args) {
final StreamsBuilder builder = new StreamsBuilder();
final String stateStoreName = "stateStore";
final StoreBuilder<KeyValueStore<String, String>> keyValueStoreBuilder =
Stores.keyValueStoreBuilder(Stores.inMemoryKeyValueStore(stateStoreName),
Serdes.String(),
Serdes.String());
builder.addStateStore(keyValueStoreBuilder);
builder.<String, String>stream("topic").transform(new WindowedTransformer(stateStoreName), stateStoreName)
.filter((k, v) -> k != null && v != null)
// Here's where you do something with records emitted after 10 minutes
.foreach((k, v)-> System.out.println());
}
static final class WindowedTransformer implements TransformerSupplier<String, String, KeyValue<String, String>> {
private final String storeName;
public WindowedTransformer(final String storeName) {
this.storeName = storeName;
}
@Override
public Transformer<String, String, KeyValue<String, String>> get() {
return new Transformer<String, String, KeyValue<String, String>>() {
private KeyValueStore<String, String> keyValueStore;
private ProcessorContext processorContext;
@Override
public void init(final ProcessorContext context) {
processorContext = context;
keyValueStore = (KeyValueStore<String, String>) context.getStateStore(storeName);
// could change this to PunctuationType.STREAM_TIME if needed
context.schedule(Duration.ofMinutes(10), PunctuationType.WALL_CLOCK_TIME, (ts) -> {
try(final KeyValueIterator<String, String> iterator = keyValueStore.all()) {
while (iterator.hasNext()) {
final KeyValue<String, String> keyValue = iterator.next();
processorContext.forward(keyValue.key, keyValue.value);
}
}
});
}
@Override
public KeyValue<String, String> transform(String key, String value) {
if (key != null) {
keyValueStore.put(key, value);
}
return null;
}
@Override
public void close() {
}
};
}
}
}
我得到了一个 Kafka Stream。如何在特定时间累积消息 window 而不管密钥?
我的用例是在不考虑密钥的情况下每 10 分钟从流中写入一个文件。
您需要使用 Transformer
with a state store and schedule a punctuation call 每 10 分钟浏览一次商店并发出记录。当您在状态存储中收集记录时,转换器应该 return null
,因此您还需要在转换器之后使用过滤器来忽略任何 null
记录。
这是我认为接近您要求的内容的简单示例。让我知道进展如何。
class WindowedTransformerExample {
public static void main(String[] args) {
final StreamsBuilder builder = new StreamsBuilder();
final String stateStoreName = "stateStore";
final StoreBuilder<KeyValueStore<String, String>> keyValueStoreBuilder =
Stores.keyValueStoreBuilder(Stores.inMemoryKeyValueStore(stateStoreName),
Serdes.String(),
Serdes.String());
builder.addStateStore(keyValueStoreBuilder);
builder.<String, String>stream("topic").transform(new WindowedTransformer(stateStoreName), stateStoreName)
.filter((k, v) -> k != null && v != null)
// Here's where you do something with records emitted after 10 minutes
.foreach((k, v)-> System.out.println());
}
static final class WindowedTransformer implements TransformerSupplier<String, String, KeyValue<String, String>> {
private final String storeName;
public WindowedTransformer(final String storeName) {
this.storeName = storeName;
}
@Override
public Transformer<String, String, KeyValue<String, String>> get() {
return new Transformer<String, String, KeyValue<String, String>>() {
private KeyValueStore<String, String> keyValueStore;
private ProcessorContext processorContext;
@Override
public void init(final ProcessorContext context) {
processorContext = context;
keyValueStore = (KeyValueStore<String, String>) context.getStateStore(storeName);
// could change this to PunctuationType.STREAM_TIME if needed
context.schedule(Duration.ofMinutes(10), PunctuationType.WALL_CLOCK_TIME, (ts) -> {
try(final KeyValueIterator<String, String> iterator = keyValueStore.all()) {
while (iterator.hasNext()) {
final KeyValue<String, String> keyValue = iterator.next();
processorContext.forward(keyValue.key, keyValue.value);
}
}
});
}
@Override
public KeyValue<String, String> transform(String key, String value) {
if (key != null) {
keyValueStore.put(key, value);
}
return null;
}
@Override
public void close() {
}
};
}
}
}