Flink 作业 运行 在 yarn 上成功但在 Kubernetes 上内存不足
Flink job ran successfully on yarn but out of memory on Kubernetes
我们有一个 Flink 作业,它从 hive 读取数据并与来自 kafka 的流数据连接。
它可以在 Yarn 上 运行 成功,但是当我们 运行 在具有完全相同内存设置的 Kubernetes 上时,它失败并出现错误
java.io.IOException: Insufficient number of network buffers: required 2, but only 1 available. The total number of network buffers is currently set to 57343 of 32768 bytes each. You can increase this number by setting the configuration keys 'taskmanager.memory.network.fraction', 'taskmanager.memory.network.min', and 'taskmanager.memory.network.max'.
\tat org.apache.flink.runtime.io.network.buffer.NetworkBufferPool.internalCreateBufferPool(NetworkBufferPool.java:340)
\tat org.apache.flink.runtime.io.network.buffer.NetworkBufferPool.createBufferPool(NetworkBufferPool.java:322)
\tat org.apache.flink.runtime.io.network.partition.ResultPartitionFactory.lambda$createBufferPoolFactory[=11=](ResultPartitionFactory.java:215)
\tat org.apache.flink.runtime.io.network.partition.ResultPartition.setup(ResultPartition.java:139)
\tat org.apache.flink.runtime.taskmanager.ConsumableNotifyingResultPartitionWriterDecorator.setup(ConsumableNotifyingResultPartitionWriterDecorator.java:88)
\tat org.apache.flink.runtime.taskmanager.Task.setupPartitionsAndGates(Task.java:869)
\tat org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:635)
\tat org.apache.flink.runtime.taskmanager.Task.run(Task.java:543)
\tat java.lang.Thread.run(Thread.java:748)
我按照说明增加了taskmanager.memory.network.fraction,然后因为OOM失败了:
Caused by: java.lang.OutOfMemoryError: Java heap space
\tat java.nio.HeapByteBuffer.<init>(HeapByteBuffer.java:57)
\tat java.nio.ByteBuffer.allocate(ByteBuffer.java:335)
\tat di.flink.shadow.org.apache.parquet.bytes.HeapByteBufferAllocator.allocate(HeapByteBufferAllocator.java:32)
\tat di.flink.shadow.org.apache.parquet.hadoop.ParquetFileReader$ConsecutiveChunkList.readAll(ParquetFileReader.java:1166)
\tat di.flink.shadow.org.apache.parquet.hadoop.ParquetFileReader.readNextRowGroup(ParquetFileReader.java:805)
\tat org.apache.flink.formats.parquet.utils.ParquetRecordReader.readNextRecord(ParquetRecordReader.java:226)
\tat org.apache.flink.formats.parquet.utils.ParquetRecordReader.reachEnd(ParquetRecordReader.java:207)
\tat org.apache.flink.formats.parquet.ParquetInputFormat.reachedEnd(ParquetInputFormat.java:233)
\tat org.apache.flink.runtime.operators.DataSourceTask.invoke(DataSourceTask.java:193)
\tat org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:719)
\tat org.apache.flink.runtime.taskmanager.Task.run(Task.java:543)
\tat java.lang.Thread.run(Thread.java:748)
我什至将 Kubernetes 上的任务管理器进程大小从 16gb 增加到 32gb,仍然出现相同的错误,通过查看 Kubernetes pod 资源使用指标,有 3-5 pods 消耗更多内存高于平均水平,并且他们的内存使用量在 运行 时间内持续增长。
我想知道 Kubernetes 上的内存使用是否存在任何已知问题,尤其是网络缓冲区,我在哪里可以检查此类指标以进行调试?
我在 docker 入口点脚本中发现了问题,
任务管理器会做
TASK_MANAGER_NUMBER_OF_TASK_SLOTS=${TASK_MANAGER_NUMBER_OF_TASK_SLOTS:-$(grep -c ^processor /proc/cpuinfo)}
在 flink-conf 中覆盖 taskmanager.numberOfTaskSlots,但是在 /proc/cpuinfo 中包含所有物理 cpu 核心,但不仅是分配给容器的核心,所以在我的例子中,taskmanager.numberOfTaskSlots 已设置为 32,导致一些容器需要完成大部分工作,而其余的则空闲。
我们有一个 Flink 作业,它从 hive 读取数据并与来自 kafka 的流数据连接。
它可以在 Yarn 上 运行 成功,但是当我们 运行 在具有完全相同内存设置的 Kubernetes 上时,它失败并出现错误
java.io.IOException: Insufficient number of network buffers: required 2, but only 1 available. The total number of network buffers is currently set to 57343 of 32768 bytes each. You can increase this number by setting the configuration keys 'taskmanager.memory.network.fraction', 'taskmanager.memory.network.min', and 'taskmanager.memory.network.max'.
\tat org.apache.flink.runtime.io.network.buffer.NetworkBufferPool.internalCreateBufferPool(NetworkBufferPool.java:340)
\tat org.apache.flink.runtime.io.network.buffer.NetworkBufferPool.createBufferPool(NetworkBufferPool.java:322)
\tat org.apache.flink.runtime.io.network.partition.ResultPartitionFactory.lambda$createBufferPoolFactory[=11=](ResultPartitionFactory.java:215)
\tat org.apache.flink.runtime.io.network.partition.ResultPartition.setup(ResultPartition.java:139)
\tat org.apache.flink.runtime.taskmanager.ConsumableNotifyingResultPartitionWriterDecorator.setup(ConsumableNotifyingResultPartitionWriterDecorator.java:88)
\tat org.apache.flink.runtime.taskmanager.Task.setupPartitionsAndGates(Task.java:869)
\tat org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:635)
\tat org.apache.flink.runtime.taskmanager.Task.run(Task.java:543)
\tat java.lang.Thread.run(Thread.java:748)
我按照说明增加了taskmanager.memory.network.fraction,然后因为OOM失败了:
Caused by: java.lang.OutOfMemoryError: Java heap space
\tat java.nio.HeapByteBuffer.<init>(HeapByteBuffer.java:57)
\tat java.nio.ByteBuffer.allocate(ByteBuffer.java:335)
\tat di.flink.shadow.org.apache.parquet.bytes.HeapByteBufferAllocator.allocate(HeapByteBufferAllocator.java:32)
\tat di.flink.shadow.org.apache.parquet.hadoop.ParquetFileReader$ConsecutiveChunkList.readAll(ParquetFileReader.java:1166)
\tat di.flink.shadow.org.apache.parquet.hadoop.ParquetFileReader.readNextRowGroup(ParquetFileReader.java:805)
\tat org.apache.flink.formats.parquet.utils.ParquetRecordReader.readNextRecord(ParquetRecordReader.java:226)
\tat org.apache.flink.formats.parquet.utils.ParquetRecordReader.reachEnd(ParquetRecordReader.java:207)
\tat org.apache.flink.formats.parquet.ParquetInputFormat.reachedEnd(ParquetInputFormat.java:233)
\tat org.apache.flink.runtime.operators.DataSourceTask.invoke(DataSourceTask.java:193)
\tat org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:719)
\tat org.apache.flink.runtime.taskmanager.Task.run(Task.java:543)
\tat java.lang.Thread.run(Thread.java:748)
我什至将 Kubernetes 上的任务管理器进程大小从 16gb 增加到 32gb,仍然出现相同的错误,通过查看 Kubernetes pod 资源使用指标,有 3-5 pods 消耗更多内存高于平均水平,并且他们的内存使用量在 运行 时间内持续增长。
我想知道 Kubernetes 上的内存使用是否存在任何已知问题,尤其是网络缓冲区,我在哪里可以检查此类指标以进行调试?
我在 docker 入口点脚本中发现了问题, 任务管理器会做
TASK_MANAGER_NUMBER_OF_TASK_SLOTS=${TASK_MANAGER_NUMBER_OF_TASK_SLOTS:-$(grep -c ^processor /proc/cpuinfo)}
在 flink-conf 中覆盖 taskmanager.numberOfTaskSlots,但是在 /proc/cpuinfo 中包含所有物理 cpu 核心,但不仅是分配给容器的核心,所以在我的例子中,taskmanager.numberOfTaskSlots 已设置为 32,导致一些容器需要完成大部分工作,而其余的则空闲。