当你在 Kafka Consumer/Client 上得到 "too many open files" 是什么意思?

What does it mean when you get "too many open files" on the Kafka Consumer/Client?

我知道这通常意味着需要增加 ulimit。但是,当它发生在消费者方面时,这实际上意味着什么?

我正在使用 Apache Flink,但在我的 Flink 任务节点上遇到了这个错误。当我重新启动我的 Flink 节点并重新部署作业时,它运行良好。经纪人当时好像也还好。

我在 3 个节点上总共有 9 个任务 运行。任何一项作业的最大并行度是 1 到 2。所以让我们假设最坏的情况是 18 parallelism/threads 超过 3 个节点。

org.apache.kafka.common.KafkaException: Failed to construct kafka consumer
at org.apache.kafka.clients.consumer.KafkaConsumer.<init>(KafkaConsumer.java:799)
at org.apache.kafka.clients.consumer.KafkaConsumer.<init>(KafkaConsumer.java:650)
at org.apache.kafka.clients.consumer.KafkaConsumer.<init>(KafkaConsumer.java:630)
at org.apache.flink.streaming.connectors.kafka.internal.KafkaPartitionDiscoverer.initializeConnections(KafkaPartitionDiscoverer.java:58)
at org.apache.flink.streaming.connectors.kafka.internals.AbstractPartitionDiscoverer.open(AbstractPartitionDiscoverer.java:94)
at org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase.open(FlinkKafkaConsumerBase.java:504)
at org.apache.flink.api.common.functions.util.FunctionUtils.openFunction(FunctionUtils.java:36)
at org.apache.flink.streaming.api.operators.AbstractUdfStreamOperator.open(AbstractUdfStreamOperator.java:102)
at org.apache.flink.streaming.runtime.tasks.StreamTask.openAllOperators(StreamTask.java:424)
at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:290)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:711)
at java.lang.Thread.run(Thread.java:748)
Caused by: org.apache.kafka.common.KafkaException: java.io.IOException: Too many open files
at org.apache.kafka.common.network.Selector.<init>(Selector.java:154)
at org.apache.kafka.common.network.Selector.<init>(Selector.java:188)
at org.apache.kafka.common.network.Selector.<init>(Selector.java:192)
at org.apache.kafka.clients.consumer.KafkaConsumer.<init>(KafkaConsumer.java:722)
... 11 more
Caused by: java.io.IOException: Too many open files
at sun.nio.ch.IOUtil.makePipe(Native Method)
at sun.nio.ch.EPollSelectorImpl.<init>(EPollSelectorImpl.java:65)
at sun.nio.ch.EPollSelectorProvider.openSelector(EPollSelectorProvider.java:36)
at java.nio.channels.Selector.open(Selector.java:227)
at org.apache.kafka.common.network.Selector.<init>(Selector.java:152)
... 14 more

每个 Kafka 客户端(生产者、消费者)为其连接的集群中的每个代理维护一个套接字(最坏情况)。

因此您正在查看 flink 创建的客户端数量乘以集群中代理的数量

出于 ulimit 的目的,套接字算作句柄。

我不知道 flink 在内部创建了多少个 kafka 客户端 - 你可以获取堆转储并查看其中有多少个客户端对象