Spring Cloud Stream 功能手册确认 - KafkaHeaders.ACKNOWLEDGMENT 不可用

Spring Cloud Stream Functional Manual Acknowledgement - KafkaHeaders.ACKNOWLEDGMENT not available

我正在使用 Spring Cloud Webflux 和 Spring Cloud Streams 功能接口来处理我的 kafka 处理。

如果我按顺序进行处理并且终止应用程序,它会返回要处理的消息,该消息按预期工作,因为没有消息丢失。但是,如果我尝试并行执行,它似乎正在确认 Kafka,这是可以理解的,因为它现在是一个单独的线程,因此想转向手动确认。

我的代码:

  1. application.yml(相关部分)
spring:
  cloud:
    stream:
      kafka:
        binder:
          brokers: localhost:9092
          autoAddPartitions: true
          minPartitionCount: 2         
      bindings:
          receiver-in-0:
               binder: kafka
              destination: topic-1
              content-type: text/plain;charset=UTF-8
              group: input-group-1
              consumer:
                  autoCommitOffset: false
spring.cloud.stream.function.definition: receiver
  1. 收件人代码
public Consumer<Flux<Message<String>>> receiver() throws IOException {
        return (sink -> {
            sink
            .onBackpressureBuffer()
            .parallel(4)
            .runOn(Schedulers.parallel())
            .subscribe((record)->{  
                Flux<Action> executor = new 
                           //Internal code which does transformation and provides a flux for execution (names changed)
 IncomingMessage().process(record);

                if(executor != null) {
                    Disposable disposable=null;
                    disposable= executor.subscribe(
                            (action)->{

                                try {
                                   //Process execute does the processing on the modified data (names changed)
                                    Process.execute(action);
                                    Acknowledgment acknowledgment = record.getHeaders().get(KafkaHeaders.ACKNOWLEDGMENT, Acknowledgment.class);
                                    if(acknowledgment !=null) {
                                        acknowledgment.acknowledge();
                                    }
                                }
                                catch(Exception e) {

                                log.fatal(e.getMsg());
                                }
                            },
                            (e)->{

                                log.fatal(e.getMsg());
                                }
                            });
                    if(disposable != null) {
                        disposable.dispose();
                    }

                }

            });

        });
    }

这里 record.getHeaders().get(KafkaHeaders.ACKNOWLEDGMENT, Acknowledgment.class); 行总是给出 null,所以我假设 autoCommitOffset: false 不起作用,我尝试将以下配置也放在绑定部分但无济于事。

 receiver-in-0:
               binder: kafka
              destination: topic-1
              content-type: text/plain;charset=UTF-8
              group: input-group-1
              autoCommitOffset: false

我的要求是,如果我终止应用程序,即使在并行情况下,它也应该继续从第一条未确认的消息中读取消息。

没有得到确认的问题 header 是因为标签的位置不正确。因为它纯粹是一个 kafka 活页夹 属性 。添加了以下 属性

spring:
  cloud:
    stream:
      kafka:
        bindings:
           receiver-in-0:
              consumer:
                   autoCommitOffset: false

频道名称应与函数调用中的频道名称相同。 或者默认值,可以设置。

 spring:
    cloud:
      stream:
        kafka:
           default:
            consumer:
               autoCommitOffset: false

然而,并行处理传入的流量可能不是一个好主意,因为消息可能会被丢弃,因为一些稍后的消息可以得到确认。它需要更多的确认逻辑,而不仅仅是设置参数确认。