如何让 Kafka Streams 每 1 小时每个键发送一条记录 window?

How to let Kafka Streams send one record per key per 1hour window?

我正在编写 Kafka Streams 应用程序。它执行以下步骤" 1)消费输入数据 2) 在 1 小时内根据新密钥对记录进行重复数据删除 window 3)重新选择密钥 4) 在 1 小时内计算密钥 window 5) 发送到下游。

我是 Kafka Streams 的新手。我的理解是,为了将 window 保持为 1 小时,我将 commit.interval.ms 也设置为 1 小时。这是正确的做法吗?

一旦我用实际流量部署我的应用程序,应用程序似乎一直在发送消息,而我认为它每小时只会发送一堆消息?

感谢任何帮助!!

我的配置:

commit.interval.ms = 3600000
request.timeout.ms = 600000
retries = 20
retry.backoff.ms = 1000    
cache.max.bytes.buffering = 10485760

// dedupe by new key per window(1hr)
 stream = inputStream
        .selectKey(... )
        .groupByKey()
        .windowedBy(TimeWindows.of(TimeUnit.MINUTES.toMillis(60)))
        // only keep the latest event for each customized key
        .reduce((event1, event2) -> event2)
        .toStream()
        .groupByKey()
        .windowedBy(TimeWindows.of(TimeUnit.MINUTES.toMillis(60)))
        .reduce((event1, event2) -> {
            long count1 = event1.getCount();
            long count2 = event2.getCount();
            event2.setCount(count1 + count2);
            return event2;
        })
        .toStream()
        .to(OUTPUT_TOPIC);
  1. 我建议您使用最新版本的 kafka 提供的 exactly-once 保证。使用它,您不必担心 de-duplicating 消息。 https://www.baeldung.com/kafka-exactly-once
  2. 配置生产者配置:特别是 buffer.memory & linger.ms。 (您也可以查看 batch.size)(查看 https://kafka.apache.org/documentation/#producerconfigs 了解更多信息)

I'm new to Kafka Streams. My understanding is, in order to keep the window as 1 hr, I set the commit.interval.ms to be 1hr as well. Is this the right thing to do?

提交间隔与你的处理逻辑无关。

您可能需要查看 suppress() 运算符。此外,以下块 post 可能会有所帮助:

Kafka Streams 的处理模型是连续的,默认发送连续的结果更新。这就是为什么基本上每个输入消息都会得到一个输出消息,因为处理输入消息会修改结果。