如何处理 UnkownProducerIdException
How to handle UnkownProducerIdException
我们在使用 Spring Cloud 和 Kafka 时遇到了一些问题,有时我们的微服务会抛出 UnkownProducerIdException
,这是由于代理端的参数 transactional.id.expiration.ms
过期造成的。
我的问题是,是否可以捕获该异常并重试失败的消息?如果是,处理它的最佳选择是什么?
我看过了:
- https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=89068820
- Kafka UNKNOWN_PRODUCER_ID exception
我们正在使用 Spring Cloud Hoxton.RELEASE
版本和 Spring Kafka 版本 2.2.4.RELEASE
我们正在使用 AWS Kafka 解决方案,因此我们无法为我之前提到的 属性 设置新值。
这里是异常的一些踪迹:
2020-04-07 20:54:00.563 ERROR 5188 --- [ad | producer-2] o.a.k.c.p.internals.TransactionManager : [Producer clientId=producer-2] The broker returned org.apache.kafka.common.errors.UnknownProducerIdException: This exception is raised by the broker if it could not locate the producer metadata associated with the producerId in question. This could happen if, for instance, the producer's records were deleted because their retention time had elapsed. Once the last records of the producerId are removed, the producer's metadata is removed from the broker, and future appends by the producer will return this exception. for topic-partition test.produce.another-2 with producerId 35000, epoch 0, and sequence number 8
2020-04-07 20:54:00.563 INFO 5188 --- [ad | producer-2] o.a.k.c.p.internals.TransactionManager : [Producer clientId=producer-2] ProducerId set to -1 with epoch -1
2020-04-07 20:54:00.565 ERROR 5188 --- [ad | producer-2] o.s.k.support.LoggingProducerListener : Exception thrown when sending a message with key='null' and payload='{...}' to topic <some-topic>:
要重现此异常:
- 我使用了 confluent docker images 并将环境变量 KAFKA_TRANSACTIONAL_ID_EXPIRATION_MS
设置为 10 秒,这样我就不会等待太多时间抛出此异常。
- 在另一个过程中,在 java 将收听的主题中以 10 秒的间隔逐条发送 1 条消息。
这是一个代码示例:
文件Bindings.java
import org.springframework.cloud.stream.annotation.Input;
import org.springframework.cloud.stream.annotation.Output;
import org.springframework.messaging.MessageChannel;
import org.springframework.messaging.SubscribableChannel;
public interface Bindings {
@Input("test-input")
SubscribableChannel testListener();
@Output("test-output")
MessageChannel testProducer();
}
文件application.yml(别忘了设置环境变量KAFKA_HOST
):
spring:
cloud:
stream:
kafka:
binder:
auto-create-topics: true
brokers: ${KAFKA_HOST}
transaction:
producer:
error-channel-enabled: true
producer-properties:
acks: all
retry.backoff.ms: 200
linger.ms: 100
max.in.flight.requests.per.connection: 1
enable.idempotence: true
retries: 3
compression.type: snappy
request.timeout.ms: 5000
key.serializer: org.apache.kafka.common.serialization.StringSerializer
consumer-properties:
session.timeout.ms: 20000
max.poll.interval.ms: 350000
enable.auto.commit: true
allow.auto.create.topics: true
auto.commit.interval.ms: 12000
max.poll.records: 5
isolation.level: read_committed
configuration:
auto.offset.reset: latest
bindings:
test-input:
# contentType: text/plain
destination: test.produce
group: group-input
consumer:
maxAttempts: 3
startOffset: latest
autoCommitOnError: true
queueBufferingMaxMessages: 100000
autoCommitOffset: true
test-output:
# contentType: text/plain
destination: test.produce.another
group: group-output
producer:
acks: all
debug: true
侦听器处理程序:
@SpringBootApplication
@EnableBinding(Bindings.class)
public class PocApplication {
private static final Logger log = LoggerFactory.getLogger(PocApplication.class);
public static void main(String[] args) {
SpringApplication.run(PocApplication.class, args);
}
@Autowired
private BinderAwareChannelResolver binderAwareChannelResolver;
@StreamListener(Topics.TESTLISTENINPUT)
public void listen(Message<?> in, String headerKey) {
final MessageBuilder builder;
MessageChannel messageChannel;
messageChannel = this.binderAwareChannelResolver.resolveDestination("test-output");
Object payload = in.getPayload();
builder = MessageBuilder.withPayload(payload);
try {
log.info("Event received: {}", in);
if (!messageChannel.send(builder.build())) {
log.error("Something happend trying send the message! {}", in.getPayload());
}
log.info("Commit success");
} catch (UnknownProducerIdException e) {
log.error("UnkownProducerIdException catched ", e);
} catch (KafkaException e) {
log.error("KafkaException catched ", e);
}catch (Exception e) {
System.out.println("Commit failed " + e.getMessage());
}
}
}
此致
} catch (UnknownProducerIdException e) {
log.error("UnkownProducerIdException catched ", e);
要在那里捕获异常,您需要设置 sync
kafka 生产者 属性 (https://cloud.spring.io/spring-cloud-static/spring-cloud-stream-binder-kafka/3.0.3.RELEASE/reference/html/spring-cloud-stream-binder-kafka.html#kafka-producer-properties)。否则,错误会异步返回
你不应该"eat"那里的例外;它必须被扔回容器,以便容器回滚事务。
此外,
}catch (Exception e) {
System.out.println("Commit failed " + e.getMessage());
}
提交由容器在流侦听器 returns 之后向容器执行,因此您永远不会在此处看到提交错误;同样,您必须让异常传播回容器。
容器将根据消费者绑定的重试配置重试交付。
可能你也可以使用回调函数来处理异常,不知道kafka的springframework lib,如果使用kafka客户端,你可以这样:
producer.send(record, new Callback() {
public void onCompletion(RecordMetadata metadata, Exception e) {
if(e != null) {
e.printStackTrace();
if(e.getClass().equals(UnknownProducerIdException.class)) {
logger.info("UnknownProducerIdException caught");
while(--retry>=0) {
send(topic,partition,msg);
}
}
} else {
logger.info("The offset of the record we just sent is: " + metadata.offset());
}
}
});
我们在使用 Spring Cloud 和 Kafka 时遇到了一些问题,有时我们的微服务会抛出 UnkownProducerIdException
,这是由于代理端的参数 transactional.id.expiration.ms
过期造成的。
我的问题是,是否可以捕获该异常并重试失败的消息?如果是,处理它的最佳选择是什么?
我看过了:
- https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=89068820
- Kafka UNKNOWN_PRODUCER_ID exception
我们正在使用 Spring Cloud Hoxton.RELEASE
版本和 Spring Kafka 版本 2.2.4.RELEASE
我们正在使用 AWS Kafka 解决方案,因此我们无法为我之前提到的 属性 设置新值。
这里是异常的一些踪迹:
2020-04-07 20:54:00.563 ERROR 5188 --- [ad | producer-2] o.a.k.c.p.internals.TransactionManager : [Producer clientId=producer-2] The broker returned org.apache.kafka.common.errors.UnknownProducerIdException: This exception is raised by the broker if it could not locate the producer metadata associated with the producerId in question. This could happen if, for instance, the producer's records were deleted because their retention time had elapsed. Once the last records of the producerId are removed, the producer's metadata is removed from the broker, and future appends by the producer will return this exception. for topic-partition test.produce.another-2 with producerId 35000, epoch 0, and sequence number 8
2020-04-07 20:54:00.563 INFO 5188 --- [ad | producer-2] o.a.k.c.p.internals.TransactionManager : [Producer clientId=producer-2] ProducerId set to -1 with epoch -1
2020-04-07 20:54:00.565 ERROR 5188 --- [ad | producer-2] o.s.k.support.LoggingProducerListener : Exception thrown when sending a message with key='null' and payload='{...}' to topic <some-topic>:
要重现此异常:
- 我使用了 confluent docker images 并将环境变量 KAFKA_TRANSACTIONAL_ID_EXPIRATION_MS
设置为 10 秒,这样我就不会等待太多时间抛出此异常。
- 在另一个过程中,在 java 将收听的主题中以 10 秒的间隔逐条发送 1 条消息。
这是一个代码示例:
文件Bindings.java
import org.springframework.cloud.stream.annotation.Input;
import org.springframework.cloud.stream.annotation.Output;
import org.springframework.messaging.MessageChannel;
import org.springframework.messaging.SubscribableChannel;
public interface Bindings {
@Input("test-input")
SubscribableChannel testListener();
@Output("test-output")
MessageChannel testProducer();
}
文件application.yml(别忘了设置环境变量KAFKA_HOST
):
spring:
cloud:
stream:
kafka:
binder:
auto-create-topics: true
brokers: ${KAFKA_HOST}
transaction:
producer:
error-channel-enabled: true
producer-properties:
acks: all
retry.backoff.ms: 200
linger.ms: 100
max.in.flight.requests.per.connection: 1
enable.idempotence: true
retries: 3
compression.type: snappy
request.timeout.ms: 5000
key.serializer: org.apache.kafka.common.serialization.StringSerializer
consumer-properties:
session.timeout.ms: 20000
max.poll.interval.ms: 350000
enable.auto.commit: true
allow.auto.create.topics: true
auto.commit.interval.ms: 12000
max.poll.records: 5
isolation.level: read_committed
configuration:
auto.offset.reset: latest
bindings:
test-input:
# contentType: text/plain
destination: test.produce
group: group-input
consumer:
maxAttempts: 3
startOffset: latest
autoCommitOnError: true
queueBufferingMaxMessages: 100000
autoCommitOffset: true
test-output:
# contentType: text/plain
destination: test.produce.another
group: group-output
producer:
acks: all
debug: true
侦听器处理程序:
@SpringBootApplication
@EnableBinding(Bindings.class)
public class PocApplication {
private static final Logger log = LoggerFactory.getLogger(PocApplication.class);
public static void main(String[] args) {
SpringApplication.run(PocApplication.class, args);
}
@Autowired
private BinderAwareChannelResolver binderAwareChannelResolver;
@StreamListener(Topics.TESTLISTENINPUT)
public void listen(Message<?> in, String headerKey) {
final MessageBuilder builder;
MessageChannel messageChannel;
messageChannel = this.binderAwareChannelResolver.resolveDestination("test-output");
Object payload = in.getPayload();
builder = MessageBuilder.withPayload(payload);
try {
log.info("Event received: {}", in);
if (!messageChannel.send(builder.build())) {
log.error("Something happend trying send the message! {}", in.getPayload());
}
log.info("Commit success");
} catch (UnknownProducerIdException e) {
log.error("UnkownProducerIdException catched ", e);
} catch (KafkaException e) {
log.error("KafkaException catched ", e);
}catch (Exception e) {
System.out.println("Commit failed " + e.getMessage());
}
}
}
此致
} catch (UnknownProducerIdException e) {
log.error("UnkownProducerIdException catched ", e);
要在那里捕获异常,您需要设置 sync
kafka 生产者 属性 (https://cloud.spring.io/spring-cloud-static/spring-cloud-stream-binder-kafka/3.0.3.RELEASE/reference/html/spring-cloud-stream-binder-kafka.html#kafka-producer-properties)。否则,错误会异步返回
你不应该"eat"那里的例外;它必须被扔回容器,以便容器回滚事务。
此外,
}catch (Exception e) {
System.out.println("Commit failed " + e.getMessage());
}
提交由容器在流侦听器 returns 之后向容器执行,因此您永远不会在此处看到提交错误;同样,您必须让异常传播回容器。
容器将根据消费者绑定的重试配置重试交付。
可能你也可以使用回调函数来处理异常,不知道kafka的springframework lib,如果使用kafka客户端,你可以这样:
producer.send(record, new Callback() {
public void onCompletion(RecordMetadata metadata, Exception e) {
if(e != null) {
e.printStackTrace();
if(e.getClass().equals(UnknownProducerIdException.class)) {
logger.info("UnknownProducerIdException caught");
while(--retry>=0) {
send(topic,partition,msg);
}
}
} else {
logger.info("The offset of the record we just sent is: " + metadata.offset());
}
}
});