Kafka 消费者 - 挂起的提取永远不会被删除并且轮询继续返回 0 条记录
Kafka consumer - pending fetch never gets removed and poll keeps on returning 0 records
我们编写了一个 Kafka 消费者,它根据配置轮询数据。每次轮询 returns 我们缓冲的大约 400 条 avro 记录。在缓冲区之后,我们对结束偏移进行查找。当缓冲区大小达到2000时,我们使用执行服务线程将它们写入HDFS,等待全部完成使用Future.get。我们一直在 HDFS(暂存文件夹)中附加相同的文件,直到我们达到 10k 的提交大小。达到提交大小后,我们将文件从暂存目录移动到 HDFS 中的最终输出位置(这是为原子提交完成的)。缓冲数据的下一次刷新将在暂存目录中创建新文件。这样做是为了在 HDFS 中没有小文件并限制打开的文件句柄。将文件提交到输出位置后,我们进行偏移量的异步提交。
5 分钟后,当 4-5 次成功写入和偏移提交完成后,我们开始获取此日志:
2020-05-24 06:29:19 TRACE Fetcher:1122 - [Consumer clientId=consumer-xxx.events-1, groupId=metadata.events] 跳过分区主题的提取。 metadata.events-1 因为之前对 kafka3-ckafka1-perf3-nvan.globalrelay.net:9093 (id: 3 rack: null) 的请求还没有被处理
我相信已向此节点发出了一些提取请求,其回调从未将其从挂起的提取中删除。
这是回调处理程序的一部分,我们将请求从挂起中删除:
https://github.com/apache/kafka/blob/trunk/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java#L321
它阻止了我对 Kafka 消费者的可用性测试,因为我们在 5 分钟内每隔 运行 遇到这个障碍。记录消耗率从 1000 左右开始,当 poll returns 零记录时,它下降到 5rps。它永远不会恢复。线程数和 CPU 使用率也下降了。
问题是由于打开的文件句柄。这是因为 Avro 的 dataFileWriter api 处于追加模式。它采用可搜索的输入和输出流。当您关闭编写器时,它会关闭输出流但不会关闭可搜索的输入。由于所有数据节点上打开文件句柄的数量超过 44k,记录消耗率急剧下降。关闭可搜索输入明确解决了问题
我们编写了一个 Kafka 消费者,它根据配置轮询数据。每次轮询 returns 我们缓冲的大约 400 条 avro 记录。在缓冲区之后,我们对结束偏移进行查找。当缓冲区大小达到2000时,我们使用执行服务线程将它们写入HDFS,等待全部完成使用Future.get。我们一直在 HDFS(暂存文件夹)中附加相同的文件,直到我们达到 10k 的提交大小。达到提交大小后,我们将文件从暂存目录移动到 HDFS 中的最终输出位置(这是为原子提交完成的)。缓冲数据的下一次刷新将在暂存目录中创建新文件。这样做是为了在 HDFS 中没有小文件并限制打开的文件句柄。将文件提交到输出位置后,我们进行偏移量的异步提交。
5 分钟后,当 4-5 次成功写入和偏移提交完成后,我们开始获取此日志:
2020-05-24 06:29:19 TRACE Fetcher:1122 - [Consumer clientId=consumer-xxx.events-1, groupId=metadata.events] 跳过分区主题的提取。 metadata.events-1 因为之前对 kafka3-ckafka1-perf3-nvan.globalrelay.net:9093 (id: 3 rack: null) 的请求还没有被处理
我相信已向此节点发出了一些提取请求,其回调从未将其从挂起的提取中删除。
这是回调处理程序的一部分,我们将请求从挂起中删除: https://github.com/apache/kafka/blob/trunk/clients/src/main/java/org/apache/kafka/clients/consumer/internals/Fetcher.java#L321
它阻止了我对 Kafka 消费者的可用性测试,因为我们在 5 分钟内每隔 运行 遇到这个障碍。记录消耗率从 1000 左右开始,当 poll returns 零记录时,它下降到 5rps。它永远不会恢复。线程数和 CPU 使用率也下降了。
问题是由于打开的文件句柄。这是因为 Avro 的 dataFileWriter api 处于追加模式。它采用可搜索的输入和输出流。当您关闭编写器时,它会关闭输出流但不会关闭可搜索的输入。由于所有数据节点上打开文件句柄的数量超过 44k,记录消耗率急剧下降。关闭可搜索输入明确解决了问题