队列消费者应用程序中的 OutOfMemory 错误

OutOfMemory error in a queue consumer application

我有一个数据管道组件,它读取在 S3 上传触发器中生成的 SQS 消息,并为批处理管道组件解析和发布消息。 我最近观察到,在生产系统中,我的数据管道在重负载下一直因 OutOfMemory 错误而崩溃,但在本地以类似负载进行测试时它从未崩溃?批处理管道似乎从未在生产中崩溃过。

当我无法在本地复制它时,如何调试它?

由于我在 2 周后找到了上述问题的解决方案,我想我会记录下来以供其他人和我未来的自己使用。

我无法重现问题,因为 aws 命令-

aws s3 cp --递归目录 s3://input-queue/dir

不知何故上传消息的速度不够快,可能会对我的本地数据管道造成压力。所以我关闭了数据管道,一旦队列中有 10k SQS 消息,我就启动了它,正如预期的那样,它在处理 ~3000 条消息后因内存不足错误而崩溃。事实证明,管道能够处理连续的吞吐量,但在以 10k 消息负载开始时就中断了。

我的假设是问题的发生是因为 Java 垃圾收集无法在执行后正确清理对象。因此,我开始分析生成的堆转储,经过几天的研究,我偶然发现了内存不足错误的可能根本原因。我的 MessageHandlerTask class 有大约 5000 个实例,理想情况下它们应该在处理后进行 GC,而不是继续堆积。

对这一思路的进一步调查使我找到了根本原因 - 结果发现代码使用 Executors.newFixedThreadPool() 创建一个 ExecutorService 来提交任务。这个实现使用了一个无界的任务队列,所以如果提交的任务太多,所有任务都在队列中等待,占用大量内存。 现实是相似的——消息被轮询的速度快于它们被处理的速度。这会导致创建大量有效的 MessageHandlerTask 实例,如果有消息积压,这些实例会填满堆内存。

解决方法是创建一个 ThreadPoolExecutor,其 ArrayBlockingQueue 的容量为 100,这样 MessageHandlerTask 及其成员变量的实例数量就会有上限。

找到解决方法后,我继续通过改变 ThreadPoolExecutor 的 maximumPoolSize 来优化管道以获得最大吞吐量。事实证明,在更高的线程数下会发生一些 SQS 连接异常。进一步调查表明,增加 SQS 连接池大小可以改善此问题。 对于给定的 1.5G Xmx 堆大小和 80 SQS 连接池大小,我最终确定了 40 个线程的数量,以便任务线程在处理时不会 运行 超出 SQS 连接。这帮助我仅使用一个数据管道实例就实现了 44 messages/s 的吞吐量。

我还发现了为什么 batchpipeline 在生产中从未崩溃,尽管遭受了类似的 ExecutorService 实现 - 事实证明数据管道可能因太多并发 S3 上传而受到压力,但 batchpipeline 的消息是由数据管道逐渐生成的时尚。此外,batchpipeline 具有更高的吞吐量,我在使用 70 maximumPoolSize 时以 347 messages/s 为基准。