Java "Tiered Queue" 快速生产者、慢速消费者的实施

Java "Tiered Queue" implementation for fast Producers, slow Consumers

我有一个生产者-消费者场景,其中生产者的生产速度比消费者的消费速度快得多。通常,解决方案是让生产者阻塞,因为 producer/consumer 场景的运行速度与最慢的组件一样快。限制或阻止生产者不是一个好的解决方案,因为我们的应用程序为消费者提供了足够的时间来稍后赶上。

这是一个图表,描述了我们应用程序中的完整 "phase" 与更常见的场景:

      Our Application                 Common Scenario
 2N +--------+--------+
    |PPPPPPPP|oooooooo|                                         P = Producer
    |PPPPPPPP|oooooooo|                                         C = Consumer
  N +--------+--------+      N +--------+--------+--------+     o = Other Work
    |CPCPCPCP|CCCCCCCC|        |CPCPCPCP|CPCPCPCP|oooooooo|     N = number of tasks
    |CPCPCPCP|CCCCCCCC|        |CPCPCPCP|CPCPCPCP|oooooooo|
    -------------------        ----------------------------
    0       T/2       T        0       T/2       T      3T/2

这个想法是通过不抑制生产者来最大化吞吐量。

我们的任务操作的数据很容易序列化,所以我打算实现一个文件系统解决方案来溢出所有不能立即满足的任务。

我正在使用 Java 的 ThreadPoolExecutor 和具有最大容量的 BlockingQueue 以确保我们不会 运行 内存不足。问题在于实现这样一个 "tiered" 队列,其中可以在内存中排队的任务会立即完成,否则数据会在磁盘上排队。

我想出了两个可能的解决方案:

  1. 从头开始实施 BlockingQueue,使用 LinkedBlockingQueueArrayBlockingQueue 实施作为参考。这可能就像复制标准库中的实现并添加文件系统一样简单 read/writes.
  2. 继续使用标准 BlockingQueue 实现,实现一个单独的 FilesystemQueue 来存储我的数据,并使用一个或多个线程使文件出列,创建 Runnable 并使用ThreadPoolExecutor.

这些是否合理,是否有可能有更好的方法?

这听起来像是使用 JMS 队列而不是文件系统的理想情况。

不使用阻塞队列,post 持久 JMS 队列上的消息。您仍然可以尝试分层方法,在 BlockingQueue 已满时将 JMS 队列与 BlockingQueue、post 并行组合到 JMS 队列,但我确信纯 JMS 方法可以正常工作自己。

在寻求更复杂的解决方案之前,您是否真的确信使用有界 BlockingQueue 会破坏您的交易?事实证明,增加堆大小和预分配足够大的容量对您来说仍然可以。它将允许您避免复杂性和性能不确定性,代价是您舒适范围内的 GC 暂停。

不过,如果您的工作负载如此不平衡,以至于它可以利用持久化内存无法容纳的大量消息(与经过验证的 MPMC 阻塞队列相比),听起来您需要一个更简单、更小的版本ActiveMQ or its Apollo 外拍。根据您的应用程序,您可能会发现 ActiveMQ 的其他功能很有用,在这种情况下您可以直接使用它。如果没有,您最好搜索 JMS space,正如 bowmore 所建议的那样。

第一个选项增加可用的堆space大小 ,正如Dimitar Dimitrov所建议,使用内存标志-Xmx,例如java -Xmx2048m

From Oracle's Documentation: Note that the JVM uses more memory than just the heap. For example Java methods, thread stacks and native handles are allocated in memory separate from the heap, as well as JVM internal data structures.

这也是java 堆内存如何分类的示意图。


第二个选项 是使用实现所请求功能的库。为此,您可以使用 ashes-queue

From project's overview: This is a simple FIFO implementation in Java which has persistent support. That is, if the queue is full, the overflowing messages will be persisted and when there are available slots, they will be put back into memory.


第三个选项创建您自己的实现。为此,您可以预览 this thread 以指导您实现该目的。

您的建议包含在最后的第三个选项中。两者都是合理的。从实现的角度来看,您应该选择第一个选项,因为它将保证更容易实现和简洁的设计。