如何处理 UnkownProducerIdException

How to handle UnkownProducerIdException

我们在使用 Spring Cloud 和 Kafka 时遇到了一些问题,有时我们的微服务会抛出 UnkownProducerIdException,这是由于代理端的参数 transactional.id.expiration.ms 过期造成的。

我的问题是,是否可以捕获该异常并重试失败的消息?如果是,处理它的最佳选择是什么?

我看过了:
- https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=89068820
- Kafka UNKNOWN_PRODUCER_ID exception

我们正在使用 Spring Cloud Hoxton.RELEASE 版本和 Spring Kafka 版本 2.2.4.RELEASE

我们正在使用 AWS Kafka 解决方案,因此我们无法为我之前提到的 属性 设置新值。

这里是异常的一些踪迹:

2020-04-07 20:54:00.563 ERROR 5188 --- [ad | producer-2] o.a.k.c.p.internals.TransactionManager   : [Producer clientId=producer-2] The broker returned org.apache.kafka.common.errors.UnknownProducerIdException: This exception is raised by the broker if it could not locate the producer metadata associated with the producerId in question. This could happen if, for instance, the producer's records were deleted because their retention time had elapsed. Once the last records of the producerId are removed, the producer's metadata is removed from the broker, and future appends by the producer will return this exception. for topic-partition test.produce.another-2 with producerId 35000, epoch 0, and sequence number 8
2020-04-07 20:54:00.563  INFO 5188 --- [ad | producer-2] o.a.k.c.p.internals.TransactionManager   : [Producer clientId=producer-2] ProducerId set to -1 with epoch -1
2020-04-07 20:54:00.565 ERROR 5188 --- [ad | producer-2] o.s.k.support.LoggingProducerListener    : Exception thrown when sending a message with key='null' and payload='{...}' to topic <some-topic>:

要重现此异常:
- 我使用了 confluent docker images 并将环境变量 KAFKA_TRANSACTIONAL_ID_EXPIRATION_MS 设置为 10 秒,这样我就不会等待太多时间抛出此异常。
- 在另一个过程中,在 java 将收听的主题中以 10 秒的间隔逐条发送 1 条消息。

这是一个代码示例:

文件Bindings.java

import org.springframework.cloud.stream.annotation.Input;
import org.springframework.cloud.stream.annotation.Output;
import org.springframework.messaging.MessageChannel;
import org.springframework.messaging.SubscribableChannel;

public interface Bindings {
  @Input("test-input")
  SubscribableChannel testListener();

  @Output("test-output")
  MessageChannel testProducer();
}

文件application.yml(别忘了设置环境变量KAFKA_HOST):

spring:
  cloud:
    stream:
      kafka:
        binder:
          auto-create-topics: true
          brokers: ${KAFKA_HOST}
          transaction:
            producer:
              error-channel-enabled: true
          producer-properties:
            acks: all
            retry.backoff.ms: 200
            linger.ms: 100
            max.in.flight.requests.per.connection: 1
            enable.idempotence: true
            retries: 3
            compression.type: snappy
            request.timeout.ms: 5000
            key.serializer: org.apache.kafka.common.serialization.StringSerializer
          consumer-properties:
            session.timeout.ms: 20000
            max.poll.interval.ms: 350000
            enable.auto.commit: true
            allow.auto.create.topics: true
            auto.commit.interval.ms: 12000
            max.poll.records: 5
            isolation.level: read_committed
          configuration:
            auto.offset.reset: latest

      bindings:

        test-input:
          # contentType: text/plain
          destination: test.produce
          group: group-input
          consumer:
            maxAttempts: 3
            startOffset: latest
            autoCommitOnError: true
            queueBufferingMaxMessages: 100000
            autoCommitOffset: true


        test-output:
          # contentType: text/plain
          destination: test.produce.another
          group: group-output
          producer:
            acks: all

debug: true

侦听器处理程序:

@SpringBootApplication
@EnableBinding(Bindings.class)
public class PocApplication {

    private static final Logger log = LoggerFactory.getLogger(PocApplication.class);

    public static void main(String[] args) {
        SpringApplication.run(PocApplication.class, args);
    }


    @Autowired
    private BinderAwareChannelResolver binderAwareChannelResolver;

    @StreamListener(Topics.TESTLISTENINPUT)
    public void listen(Message<?> in, String headerKey) {
        final MessageBuilder builder;
        MessageChannel messageChannel;

        messageChannel = this.binderAwareChannelResolver.resolveDestination("test-output");

        Object payload = in.getPayload();
        builder = MessageBuilder.withPayload(payload);

        try {
            log.info("Event received: {}", in);

            if (!messageChannel.send(builder.build())) {
                log.error("Something happend trying send the message! {}", in.getPayload());
            }

            log.info("Commit success");
        } catch (UnknownProducerIdException e) {
            log.error("UnkownProducerIdException catched ", e);
        } catch (KafkaException e) {
            log.error("KafkaException catched ", e);
        }catch (Exception e) {
            System.out.println("Commit failed " + e.getMessage());
        }
    }
}

此致

        } catch (UnknownProducerIdException e) {
            log.error("UnkownProducerIdException catched ", e);

要在那里捕获异常,您需要设置 sync kafka 生产者 属性 (https://cloud.spring.io/spring-cloud-static/spring-cloud-stream-binder-kafka/3.0.3.RELEASE/reference/html/spring-cloud-stream-binder-kafka.html#kafka-producer-properties)。否则,错误会异步返回

你不应该"eat"那里的例外;它必须被扔回容器,以便容器回滚事务。

此外,

        }catch (Exception e) {
            System.out.println("Commit failed " + e.getMessage());
        }

提交由容器在流侦听器 returns 之后向容器执行,因此您永远不会在此处看到提交错误;同样,您必须让异常传播回容器。

容器将根据消费者绑定的重试配置重试交付。

可能你也可以使用回调函数来处理异常,不知道kafka的springframework lib,如果使用kafka客户端,你可以这样:

producer.send(record, new Callback() {
        public void onCompletion(RecordMetadata metadata, Exception e) {
            if(e != null) {
               e.printStackTrace();

               if(e.getClass().equals(UnknownProducerIdException.class)) {
                   logger.info("UnknownProducerIdException caught");
                   while(--retry>=0) {
                       send(topic,partition,msg);
                   }
               }
            } else {
                logger.info("The offset of the record we just sent is: " + metadata.offset());
            }
        }
    });