Java线程:过度CPU利用率
Java Threading: Excessive CPU Utilization
我正在使用从 Kafka
读取消息并将其推送到 Cassandra
的服务。
我正在使用线程架构。
据说,k threads
从 Kafka 主题消费。这些写入队列,声明为:
public static BlockingQueue<>
现在有许多线程,比如 n
,写入 Cassandra。这是执行此操作的代码:
public void run(){
LOGGER.log(Level.INFO, "Thread Created: " +Thread.currentThread().getName());
while (!Thread.currentThread().isInterrupted()) {
Thread.yield();
if (!content.isEmpty()) {
try {
JSONObject msg = content.remove();
// JSON
for(String tableName : tableList){
CassandraConnector.getSession().execute(createQuery(tableName, msg));
}
} catch (Exception e) {
}
}
}
}
content
是用于读写操作的BlockingQueue
我正在扩展 Thread
class 线程的实现,并且有固定数量的线程继续执行,除非被中断。
问题是,这使用了太多 CPU。这是 top
命令的第一行:
PID USER PR NI VIRT RES SHR S %CPU %MEM TIME+ COMMAND
46232 vishran+ 20 0 3010804 188052 14280 S 137.8 3.3 5663:24 java
这里是 strace
在此进程的一个线程上的输出:
strace -t -p 46322
Process 46322 attached
15:18:47 sched_yield() = 0
15:18:47 sched_yield() = 0
15:18:47 sched_yield() = 0
15:18:47 sched_yield() = 0
15:18:47 sched_yield() = 0
15:18:47 sched_yield() = 0
15:18:47 sched_yield() = 0
15:18:47 sched_yield() = 0
15:18:47 sched_yield() = 0
15:18:47 sched_yield() = 0
15:18:47 sched_yield() = 0
15:18:47 sched_yield() = 0
15:18:47 sched_yield() = 0
15:18:47 sched_yield() = 0
15:18:47 sched_yield() = 0
....and so on
为什么我使用 Thread.yield()
,是因为 this
如果您需要任何其他调试信息,请告诉我。
现在的问题是,如何才能将 CPU 利用率降至最低?
从您的代码来看,您的消费者线程似乎总是在检查可用内容。因此,您的线程总是 运行 并且从不空闲(等待有人通知它们),因此您的 CPU 总是在做某事,即使它总是让线程成为当前线程。
while (!Thread.currentThread().isInterrupted()) {
Thread.yield();
if (!content.isEmpty()) {
您显然正在努力解决我们许多人在编程生涯中遇到的生产者-消费者问题。
您目前正在做的是让消费者主动不断地检查它是否有东西要消费。
最简单也是最简单的 CPU 密集解决方法是:
- 让生产者向消费者发出信号,表明它已经生产了一些东西。
查看 this example as it contains a simplest way to do it. You may want to revisit Java Concurrency in Practice 以获得更深刻的帮助。
BlockingQueue 的全部目的是当它为空时阻塞。因此消费者线程(填充到 Cassandra 中的线程)不必手动检查它们是否为空。您可以只调用 take() ,如果队列为空,则调用将阻塞,除非它被中断或有可用元素。
当一个线程被阻塞时,调度程序可以在它的位置调度一些其他线程,从而避免调用 yield() 等。请记住,仅当优先级大于或等于正在让出的线程的线程可用于 运行.
时,yield() 才会让位给另一个线程。
public void run(){
LOGGER.log(Level.INFO, "Thread Created: " +Thread.currentThread().getName());
try {
JSONObject msg = content.take();
// JSON
for(String tableName : tableList){
CassandraConnector.getSession().execute(createQuery(tableName, msg));
}
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
}
如其他答案中所述,您正在执行忙等待,而不是使用 content
BlockingQueue 的核心功能:等待下一个条目并将其从队列中删除。这是使用 take()
方法完成的:
while (!Thread.currentThread().isInterrupted()) {
try {
JSONObject msg = content.take();
for(String tableName : tableList){
CassandraConnector.getSession().execute(createQuery(tableName, msg));
}
} catch (Exception e) {
}
}
我正在使用从 Kafka
读取消息并将其推送到 Cassandra
的服务。
我正在使用线程架构。
据说,k threads
从 Kafka 主题消费。这些写入队列,声明为:
public static BlockingQueue<>
现在有许多线程,比如 n
,写入 Cassandra。这是执行此操作的代码:
public void run(){
LOGGER.log(Level.INFO, "Thread Created: " +Thread.currentThread().getName());
while (!Thread.currentThread().isInterrupted()) {
Thread.yield();
if (!content.isEmpty()) {
try {
JSONObject msg = content.remove();
// JSON
for(String tableName : tableList){
CassandraConnector.getSession().execute(createQuery(tableName, msg));
}
} catch (Exception e) {
}
}
}
}
content
是用于读写操作的BlockingQueue
我正在扩展 Thread
class 线程的实现,并且有固定数量的线程继续执行,除非被中断。
问题是,这使用了太多 CPU。这是 top
命令的第一行:
PID USER PR NI VIRT RES SHR S %CPU %MEM TIME+ COMMAND
46232 vishran+ 20 0 3010804 188052 14280 S 137.8 3.3 5663:24 java
这里是 strace
在此进程的一个线程上的输出:
strace -t -p 46322
Process 46322 attached
15:18:47 sched_yield() = 0
15:18:47 sched_yield() = 0
15:18:47 sched_yield() = 0
15:18:47 sched_yield() = 0
15:18:47 sched_yield() = 0
15:18:47 sched_yield() = 0
15:18:47 sched_yield() = 0
15:18:47 sched_yield() = 0
15:18:47 sched_yield() = 0
15:18:47 sched_yield() = 0
15:18:47 sched_yield() = 0
15:18:47 sched_yield() = 0
15:18:47 sched_yield() = 0
15:18:47 sched_yield() = 0
15:18:47 sched_yield() = 0
....and so on
为什么我使用 Thread.yield()
,是因为 this
如果您需要任何其他调试信息,请告诉我。
现在的问题是,如何才能将 CPU 利用率降至最低?
从您的代码来看,您的消费者线程似乎总是在检查可用内容。因此,您的线程总是 运行 并且从不空闲(等待有人通知它们),因此您的 CPU 总是在做某事,即使它总是让线程成为当前线程。
while (!Thread.currentThread().isInterrupted()) {
Thread.yield();
if (!content.isEmpty()) {
您显然正在努力解决我们许多人在编程生涯中遇到的生产者-消费者问题。
您目前正在做的是让消费者主动不断地检查它是否有东西要消费。
最简单也是最简单的 CPU 密集解决方法是:
- 让生产者向消费者发出信号,表明它已经生产了一些东西。
查看 this example as it contains a simplest way to do it. You may want to revisit Java Concurrency in Practice 以获得更深刻的帮助。
BlockingQueue 的全部目的是当它为空时阻塞。因此消费者线程(填充到 Cassandra 中的线程)不必手动检查它们是否为空。您可以只调用 take() ,如果队列为空,则调用将阻塞,除非它被中断或有可用元素。
当一个线程被阻塞时,调度程序可以在它的位置调度一些其他线程,从而避免调用 yield() 等。请记住,仅当优先级大于或等于正在让出的线程的线程可用于 运行.
时,yield() 才会让位给另一个线程。public void run(){
LOGGER.log(Level.INFO, "Thread Created: " +Thread.currentThread().getName());
try {
JSONObject msg = content.take();
// JSON
for(String tableName : tableList){
CassandraConnector.getSession().execute(createQuery(tableName, msg));
}
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
}
如其他答案中所述,您正在执行忙等待,而不是使用 content
BlockingQueue 的核心功能:等待下一个条目并将其从队列中删除。这是使用 take()
方法完成的:
while (!Thread.currentThread().isInterrupted()) {
try {
JSONObject msg = content.take();
for(String tableName : tableList){
CassandraConnector.getSession().execute(createQuery(tableName, msg));
}
} catch (Exception e) {
}
}