Java 只有一个消费者和生产者线程的并发队列
Concurrent queue with only one consumer and producer threads for Java
我在应用程序中有原始消息系统。消息可以由生产者从一个线程提交并由消费者在另一个线程中处理 - 按照设计只有 两个 线程:一个线程用于消费者,另一个线程用于生产者,并且它是无法更改此逻辑。
我正在使用 ConcurrentLinkedQueue<>
实现来处理消息:
// producer's code (adds the request)
this.queue.add(req);
// consumer's code (busy loop with request polling)
while (true) {
Request req = this.queue.poll();
if (req == null) {
continue;
}
if (req.last()) {
// last request submitted by consumer
return;
}
// function to process the request
this.process(req);
}
处理逻辑非常快,消费者每秒可能收到大约 X_000_000
个请求。
但我发现使用探查器时 queue.poll()
有时速度很慢(似乎是在队列从生产者那里接收大量新项目时)- 在接收大量新消息时速度大约慢 10 倍与没有从另一个线程添加新项目的已经填满的队列进行比较。
可以优化吗?对于这种特殊情况,最好的 Queue<>
实现是什么(一个线程用于 poll()
,一个线程用于 add()
)?也许自己实现一些简单的队列会更容易?
消费者在生产者生产时速度较慢,因为每次读取时都会遇到缓存未命中,因为新元素始终存在。
如果所有元素都已经存在,则可以将它们一起获取,从而提高吞吐量。
忙等待时考虑使用Thread.onSpinWait()
:虽然它增加了延迟,但它也实现了某些性能优化。
// consumer's code (busy loop with request polling)
while (true) {
Request req = this.queue.poll();
if (req == null) {
Thread.onSpinWait();
continue;
}
if (req.last()) {
// last request submitted by consumer
return;
}
// function to process the request
this.process(req);
}
JDK 没有针对 SPSC(单生产者单消费者)场景优化的队列。有图书馆。您可以使用 Agrona or JCTools。实现这些并不容易。
// Agrona
Queue<Request> queue = new OneToOneConcurrentArrayQueue<>(2048);
// JCTools
Queue<Request> queue = new SpscArrayQueue<>(2048);
我在应用程序中有原始消息系统。消息可以由生产者从一个线程提交并由消费者在另一个线程中处理 - 按照设计只有 两个 线程:一个线程用于消费者,另一个线程用于生产者,并且它是无法更改此逻辑。
我正在使用 ConcurrentLinkedQueue<>
实现来处理消息:
// producer's code (adds the request)
this.queue.add(req);
// consumer's code (busy loop with request polling)
while (true) {
Request req = this.queue.poll();
if (req == null) {
continue;
}
if (req.last()) {
// last request submitted by consumer
return;
}
// function to process the request
this.process(req);
}
处理逻辑非常快,消费者每秒可能收到大约 X_000_000
个请求。
但我发现使用探查器时 queue.poll()
有时速度很慢(似乎是在队列从生产者那里接收大量新项目时)- 在接收大量新消息时速度大约慢 10 倍与没有从另一个线程添加新项目的已经填满的队列进行比较。
可以优化吗?对于这种特殊情况,最好的 Queue<>
实现是什么(一个线程用于 poll()
,一个线程用于 add()
)?也许自己实现一些简单的队列会更容易?
消费者在生产者生产时速度较慢,因为每次读取时都会遇到缓存未命中,因为新元素始终存在。 如果所有元素都已经存在,则可以将它们一起获取,从而提高吞吐量。
忙等待时考虑使用Thread.onSpinWait()
:虽然它增加了延迟,但它也实现了某些性能优化。
// consumer's code (busy loop with request polling)
while (true) {
Request req = this.queue.poll();
if (req == null) {
Thread.onSpinWait();
continue;
}
if (req.last()) {
// last request submitted by consumer
return;
}
// function to process the request
this.process(req);
}
JDK 没有针对 SPSC(单生产者单消费者)场景优化的队列。有图书馆。您可以使用 Agrona or JCTools。实现这些并不容易。
// Agrona
Queue<Request> queue = new OneToOneConcurrentArrayQueue<>(2048);
// JCTools
Queue<Request> queue = new SpscArrayQueue<>(2048);