Java 只有一个消费者和生产者线程的并发队列

Concurrent queue with only one consumer and producer threads for Java

我在应用程序中有原始消息系统。消息可以由生产者从一个线程提交并由消费者在另一个线程中处理 - 按照设计只有 两个 线程:一个线程用于消费者,另一个线程用于生产者,并且它是无法更改此逻辑。

我正在使用 ConcurrentLinkedQueue<> 实现来处理消息:

// producer's code (adds the request)
this.queue.add(req);

// consumer's code (busy loop with request polling)
while (true) {
  Request req = this.queue.poll();
  if (req == null) {
    continue;
  }
  if (req.last()) {
    // last request submitted by consumer
    return;
  }
  // function to process the request
  this.process(req);
}

处理逻辑非常快,消费者每秒可能收到大约 X_000_000 个请求。

但我发现使用探查器时 queue.poll() 有时速度很慢(似乎是在队列从生产者那里接收大量新项目时)- 在接收大量新消息时速度大约慢 10 倍与没有从另一个线程添加新项目的已经填满的队列进行比较。

可以优化吗?对于这种特殊情况,最好的 Queue<> 实现是什么(一个线程用于 poll(),一个线程用于 add())?也许自己实现一些简单的队列会更容易?

消费者在生产者生产时速度较慢,因为每次读取时都会遇到缓存未命中,因为新元素始终存在。 如果所有元素都已经存在,则可以将它们一起获取,从而提高吞吐量。

忙等待时考虑使用Thread.onSpinWait():虽然它增加了延迟,但它也实现了某些性能优化。

// consumer's code (busy loop with request polling)
while (true) {
  Request req = this.queue.poll();
  if (req == null) {
    Thread.onSpinWait();
    continue;
  }
  if (req.last()) {
    // last request submitted by consumer
    return;
  }
  // function to process the request
  this.process(req);
}

JDK 没有针对 SPSC(单生产者单消费者)场景优化的队列。有图书馆。您可以使用 Agrona or JCTools。实现这些并不容易。

// Agrona
Queue<Request> queue = new OneToOneConcurrentArrayQueue<>(2048);
// JCTools
Queue<Request> queue = new SpscArrayQueue<>(2048);