在卡夫卡流记录转发期间,接收器的分区计数减少

partition count reduced at sink during kafka stream record forward

我正在使用 kafka 流处理少量 kafka 记录,我有两个节点,一个用于进行一些转换,另一个是最终接收器。

我的主题是 INTER_TOPIC 和 FINAL_TOPIC 每个都有 20 个分区。我写入 INTER_TOPIC 的生产者正在写入键值,而 partition-er 是循环法。

下面是我的帧间转换节点的代码。

public void streamHandler() {

        Properties props = getKafkaProperties();

        StreamsBuilder builder = new StreamsBuilder();

        KStream<String, String> processStream = builder.stream("INTER_TOPIC",
                Consumed.with(Serdes.String(), Serdes.String()));

        //processStream.peek((key,value)->System.out.println("key :"+key+" value :"+value));

        processStream.map((key, value) -> getTransformer().transform(key, value)).filter((key,value)->filteroutFailedRequest(key,value)).to("FINAL_TOPIC", Produced.with(Serdes.String(), Serdes.String()));


        KafkaStreams IStreams = new KafkaStreams(builder.build(), props);

        IStreams.setUncaughtExceptionHandler(new Thread.UncaughtExceptionHandler() {
            @Override
            public void uncaughtException(Thread t, Throw-able e) {

                logger.error("Thread Name :" + t.getName() + " Error while processing:", e);
            }
        });

        IStreams.cleanUp();
        IStreams.start();

        try {
            System.in.read();
        } catch (IOException e) {

            logger.error("Failed streaming ",e);
        }
    }

但我的接收器仅在 2 个分区中获取数据,但我配置了 20 个流线程,并且我验证了我的生产者正在写入所有 20 个分区,如何知道我的转换节点转发到我的所有 20 个分区FINAL_TOPIC

30 Sep 2019 10:39:41,416 INFO  c.j.m.s.StreamHandler [289] [streams-user-61a77203-9afc-4c66-843d-94c20a509793-StreamThread-3] Received
30 Sep 2019 10:39:41,416 INFO  c.j.m.s.StreamHandler [289] [streams-user-61a77203-9afc-4c66-843d-94c20a509793-StreamThread-4] Received
30 Sep 2019 10:39:41,416 INFO  c.j.m.s.StreamHandler [289] [streams-user-61a77203-9afc-4c66-843d-94c20a509793-StreamThread-3] Received
30 Sep 2019 10:39:41,416 INFO  c.j.m.s.StreamHandler [289] [streams-user-61a77203-9afc-4c66-843d-94c20a509793-StreamThread-4] Received
30 Sep 2019 10:40:57,427 INFO  c.j.m.s.StreamHandler [289] [streams-user-61a77203-9afc-4c66-843d-94c20a509793-StreamThread-3] Received
30 Sep 2019 10:40:57,427 INFO  c.j.m.s.StreamHandler [289] [streams-user-61a77203-9afc-4c66-843d-94c20a509793-StreamThread-4] Received
30 Sep 2019 10:40:57,427 INFO  c.j.m.s.StreamHandler [289] [streams-user-61a77203-9afc-4c66-843d-94c20a509793-StreamThread-3] Received
30 Sep 2019 10:40:57,427 INFO  c.j.m.s.StreamHandler [289] [streams-user-61a77203-9afc-4c66-843d-94c20a509793-StreamThread-4] Received

and partition-er is round robin

为什么您认为分区程序是循环的?默认情况下,Kafka Streams 基于键应用基于散列的分区。

如果要更改默认分区程序,可以实现接口 StreamPartitioner 并通过以下方式传递:

Produced.with(Serdes.String(), Serdes.String())
        .withStreamPartitioner(...)