为什么在 flink EXACTLY_ONCE 模式下每次提交都关闭了 kafka producer
why kafka producer has been closed for each commit in flink EXACTLY_ONCE mode
我在我的 flink 应用程序中使用 flink-connector-kafka,语义设置为 EXACTLY_ONCE,我看到日志不断打印 kafka 已关闭并重新连接,如下所示:
Closing the Kafka producer with timeoutMillis = 0 ms.
Proceeding to force close the producer since pending requests could not be completed within timeout 0 ms.
我查看了源代码,发现生产者提交函数的close调用,commit fun在finally块中调用recycleTransactionalProducer,recycleTransactionalProducer fun调用close fun,whitch打印日志,为什么kafka生产者每次提交都已关闭?
包中的源代码:
org.apache.flink.streaming.connectors.kafka;
org.apache.kafka.clients.producer;
@Override
protected void commit(FlinkKafkaProducer.KafkaTransactionState transaction) {
if (transaction.isTransactional()) {
try {
transaction.producer.commitTransaction();
} finally {
recycleTransactionalProducer(transaction.producer);
}
}
}
private void recycleTransactionalProducer(FlinkKafkaInternalProducer<byte[], byte[]> producer) {
availableTransactionalIds.add(producer.getTransactionalId());
producer.flush();
producer.close(Duration.ofSeconds(0));
}
private void close(Duration timeout, boolean swallowException) {
long timeoutMs = timeout.toMillis();
if (timeoutMs < 0)
throw new IllegalArgumentException("The timeout cannot be negative.");
log.info("Closing the Kafka producer with timeoutMillis = {} ms.", timeoutMs);
... when using exactly-once semantics for the FlinkKafkaProducer, there is a fixed-sized pool of short-living Kafka producers that are created for each concurrent checkpoint.
When a checkpoint begins, the FlinkKafkaProducer creates a new producer for that checkpoint. Once said checkpoint completes, the producer for that checkpoint is attempted to be closed and recycled.
So, it is normal to see logs of Kafka producers being closed if you're using an exactly-once transactional FlinkKafkaProducer.
我在我的 flink 应用程序中使用 flink-connector-kafka,语义设置为 EXACTLY_ONCE,我看到日志不断打印 kafka 已关闭并重新连接,如下所示:
Closing the Kafka producer with timeoutMillis = 0 ms.
Proceeding to force close the producer since pending requests could not be completed within timeout 0 ms.
我查看了源代码,发现生产者提交函数的close调用,commit fun在finally块中调用recycleTransactionalProducer,recycleTransactionalProducer fun调用close fun,whitch打印日志,为什么kafka生产者每次提交都已关闭?
包中的源代码:
org.apache.flink.streaming.connectors.kafka;
org.apache.kafka.clients.producer;
@Override
protected void commit(FlinkKafkaProducer.KafkaTransactionState transaction) {
if (transaction.isTransactional()) {
try {
transaction.producer.commitTransaction();
} finally {
recycleTransactionalProducer(transaction.producer);
}
}
}
private void recycleTransactionalProducer(FlinkKafkaInternalProducer<byte[], byte[]> producer) {
availableTransactionalIds.add(producer.getTransactionalId());
producer.flush();
producer.close(Duration.ofSeconds(0));
}
private void close(Duration timeout, boolean swallowException) {
long timeoutMs = timeout.toMillis();
if (timeoutMs < 0)
throw new IllegalArgumentException("The timeout cannot be negative.");
log.info("Closing the Kafka producer with timeoutMillis = {} ms.", timeoutMs);
... when using exactly-once semantics for the FlinkKafkaProducer, there is a fixed-sized pool of short-living Kafka producers that are created for each concurrent checkpoint. When a checkpoint begins, the FlinkKafkaProducer creates a new producer for that checkpoint. Once said checkpoint completes, the producer for that checkpoint is attempted to be closed and recycled. So, it is normal to see logs of Kafka producers being closed if you're using an exactly-once transactional FlinkKafkaProducer.