线程池并在计数器达到 X 时停止执行 for 循环
Thread pools and stopping execution of a for loop when a counter reaches X
我对线程池和提供 for 循环的退出条件有点困惑。关于如何正确执行此操作,我还没有找到合适的解释。我一直在尝试一些可能性,但我被卡住了
我有这段代码。
@Override
@Transactional(propagation=Propagation.REQUIRED, rollbackFor=Throwable.class)
public void auditAllDomainConfigurationStatuses() {
logger.info("Starting audit of all domain configuration statusses");
int errorStatusCounter = 0;
Map<SubdomainRegistryStatus, List<String>> domainsByStatus = new HashMap<SubdomainRegistryStatus, List<String>>();
List<DomainConfigurationStatus> domains = domainConfigurationStatusDao.findAll();
for (DomainConfigurationStatus domainConfigurationStatus : domains) {
String domainName = domainConfigurationStatus.getDomainName();
DomainConfigurationStatus result = domainConfigurationStatusAuditor.auditDomainConfigurationStatus(domainConfigurationStatus.getId());
addDomainToDomainsByStatusMap(domainsByStatus, result, domainName);
if(SubdomainRegistryStatus.ERROR.equals(result.getStatus())){
errorStatusCounter++;
if(errorStatusCounter >= EMERGENCY_AUDIT_STOP_LIMIT){
logger.error("Emergency audit stop more then " + EMERGENCY_AUDIT_STOP_LIMIT + " records went into status ERROR");
mailEmergencyDomainConfigurationStatusAuditStop();
return;
}
}else{
errorStatusCounter = 0;
}
}
mailDomainConfigurationStatusReport(domainsByStatus);
logger.info("Audit of all domain configuration statusses completed");
}
此代码将在某处调用域的 dns 来获取它的 ip。然后它将更新数据库中的状态。很简单的事情。然而,如果 X 次之后状态转换为 ERROR,业务希望我们停止整个过程。我设法写了这个,用上面的方法很简单。但是调用 dns 获取 ip 很慢,我每秒可以处理大约 6 个域。我们必须处理超过 32 000 个域。我们需要提高性能,建议使用多线程。
所以我开始编写任务,在 spring 中创建线程池等...然后我意识到等待 EMERGENCY_AUDIT_STOP_LIMIT 如果计数器运行在多个线程上,我怎么还能这样做。 ..没有任何回调。所以我尝试使用 Callable 而不是 Runnable 所以我正在使用 Future,然后我得出的结论是我在想,未来将阻止它的 future.get() 方法所以我要结束up with 是一种与我的原始实现一样慢或更慢的方法。
到目前为止,这是我的道路,我现在有点受阻,Runnable 不能抛出异常,因此将计数器传递给任务也不起作用,Callable 将受阻,因此也没有选择.
如果有任何多线程专家有想法,我将不胜感激。下面是我最近的尝试,它没有坏,但和我上面的方法一样慢。
@Override
@Transactional(propagation=Propagation.REQUIRED, rollbackFor=Throwable.class)
public void auditAllDomainConfigurationStatuses() throws InterruptedException, ExecutionException {
logger.info("Starting audit of all domain configuration statusses");
int errorStatusCounter = 0;
Map<SubdomainRegistryStatus, List<String>> domainsByStatus = new HashMap<SubdomainRegistryStatus, List<String>>();
List<DomainConfigurationStatus> domains = domainConfigurationStatusDao.findAll();
for (DomainConfigurationStatus domainConfigurationStatus : domains) {
try {
Future<Integer> futureResult = taskExecutor.submit(new DomainConfigurationAuditTask(errorStatusCounter, domainConfigurationStatusAuditor, domainConfigurationStatus.getId(), domainsByStatus, EMERGENCY_AUDIT_STOP_LIMIT));
futureResult.get();
}
catch (Exception e) {
logger.error("Emergency audit stop more then " + EMERGENCY_AUDIT_STOP_LIMIT + " records went into status ERROR");
mailEmergencyDomainConfigurationStatusAuditStop();
return;
}
}
mailDomainConfigurationStatusReport(domainsByStatus);
logger.info("Audit of all domain configuration statusses completed");
}
这是一个非常简单的解决方案。基本上,完成某些工作(即 DNS 查找)的任务是完全隔离和可并行的。它在成功或失败后的部分工作是将成功布尔值提交给另一个 固定大小为 1 的 ExecutoService
,它可以执行您想要的任何错误条件检查。
在这种情况下,它只是简单地递增一个带有连续错误的整数,直到达到最大条件,然后设置一个错误条件,工作线程(DNS 查找)首先检查该错误条件以实现快速失败方法,因此所有排队的任务会在遇到错误条件后快速退出。
这最终成为在像这样的多线程场景中跟踪连续错误的非常简单的方法,因为您正在对响应进行单线程检查
我可以想到使用 Java 8 的 CompletableFuture
的更优雅的解决方案,但听起来好像不符合 table
package so.thread.errcondition;
import java.util.Random;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
public class Main {
public static Random rand = new Random();
public static ExecutorService workers = Executors.newFixedThreadPool(5);
// NOTE, this executor has a FIXED size of 1 for in-order processing
public static ExecutorService watcher = Executors.newFixedThreadPool(1);
public static AtomicBoolean errorCondition = new AtomicBoolean(false);
public static AtomicInteger errorCount = new AtomicInteger(0);
public static Integer MAX_ERRORS = 5;
public static void main(String[] args) throws Exception {
int jobs = 1000;
for (int i = 0; i < jobs; i++) {
workers.submit(getWork());
}
Thread.sleep(TimeUnit.SECONDS.toMillis(5));
}
// parallelizable task, the number of parallel workers is irrelevant
public static Runnable getWork() {
return new Runnable() {
@Override
public void run() {
// fail fast
if (errorCondition.get()) {
System.out.println("%%% MAX_ERRORS of [" + MAX_ERRORS + "] occurred, skipping task");
return;
}
// do work
if (rand.nextBoolean()) {
// GOOD JOB
System.out.println("+++ GOOD RESULT");
submitDoneTask(true);
} else {
// ERROR
System.out.println("*** BAD RESULT");
submitDoneTask(false);
}
}
};
}
public static void submitDoneTask(final boolean success) {
watcher.submit(new Runnable() {
@Override
public void run() {
if (!errorCondition.get() && success) {
errorCount.set(0);
} else {
int errors = errorCount.incrementAndGet();
if (errors >= MAX_ERRORS) {
errorCondition.set(true);
}
}
}
});
}
}
我对线程池和提供 for 循环的退出条件有点困惑。关于如何正确执行此操作,我还没有找到合适的解释。我一直在尝试一些可能性,但我被卡住了
我有这段代码。
@Override
@Transactional(propagation=Propagation.REQUIRED, rollbackFor=Throwable.class)
public void auditAllDomainConfigurationStatuses() {
logger.info("Starting audit of all domain configuration statusses");
int errorStatusCounter = 0;
Map<SubdomainRegistryStatus, List<String>> domainsByStatus = new HashMap<SubdomainRegistryStatus, List<String>>();
List<DomainConfigurationStatus> domains = domainConfigurationStatusDao.findAll();
for (DomainConfigurationStatus domainConfigurationStatus : domains) {
String domainName = domainConfigurationStatus.getDomainName();
DomainConfigurationStatus result = domainConfigurationStatusAuditor.auditDomainConfigurationStatus(domainConfigurationStatus.getId());
addDomainToDomainsByStatusMap(domainsByStatus, result, domainName);
if(SubdomainRegistryStatus.ERROR.equals(result.getStatus())){
errorStatusCounter++;
if(errorStatusCounter >= EMERGENCY_AUDIT_STOP_LIMIT){
logger.error("Emergency audit stop more then " + EMERGENCY_AUDIT_STOP_LIMIT + " records went into status ERROR");
mailEmergencyDomainConfigurationStatusAuditStop();
return;
}
}else{
errorStatusCounter = 0;
}
}
mailDomainConfigurationStatusReport(domainsByStatus);
logger.info("Audit of all domain configuration statusses completed");
}
此代码将在某处调用域的 dns 来获取它的 ip。然后它将更新数据库中的状态。很简单的事情。然而,如果 X 次之后状态转换为 ERROR,业务希望我们停止整个过程。我设法写了这个,用上面的方法很简单。但是调用 dns 获取 ip 很慢,我每秒可以处理大约 6 个域。我们必须处理超过 32 000 个域。我们需要提高性能,建议使用多线程。
所以我开始编写任务,在 spring 中创建线程池等...然后我意识到等待 EMERGENCY_AUDIT_STOP_LIMIT 如果计数器运行在多个线程上,我怎么还能这样做。 ..没有任何回调。所以我尝试使用 Callable 而不是 Runnable 所以我正在使用 Future,然后我得出的结论是我在想,未来将阻止它的 future.get() 方法所以我要结束up with 是一种与我的原始实现一样慢或更慢的方法。
到目前为止,这是我的道路,我现在有点受阻,Runnable 不能抛出异常,因此将计数器传递给任务也不起作用,Callable 将受阻,因此也没有选择.
如果有任何多线程专家有想法,我将不胜感激。下面是我最近的尝试,它没有坏,但和我上面的方法一样慢。
@Override
@Transactional(propagation=Propagation.REQUIRED, rollbackFor=Throwable.class)
public void auditAllDomainConfigurationStatuses() throws InterruptedException, ExecutionException {
logger.info("Starting audit of all domain configuration statusses");
int errorStatusCounter = 0;
Map<SubdomainRegistryStatus, List<String>> domainsByStatus = new HashMap<SubdomainRegistryStatus, List<String>>();
List<DomainConfigurationStatus> domains = domainConfigurationStatusDao.findAll();
for (DomainConfigurationStatus domainConfigurationStatus : domains) {
try {
Future<Integer> futureResult = taskExecutor.submit(new DomainConfigurationAuditTask(errorStatusCounter, domainConfigurationStatusAuditor, domainConfigurationStatus.getId(), domainsByStatus, EMERGENCY_AUDIT_STOP_LIMIT));
futureResult.get();
}
catch (Exception e) {
logger.error("Emergency audit stop more then " + EMERGENCY_AUDIT_STOP_LIMIT + " records went into status ERROR");
mailEmergencyDomainConfigurationStatusAuditStop();
return;
}
}
mailDomainConfigurationStatusReport(domainsByStatus);
logger.info("Audit of all domain configuration statusses completed");
}
这是一个非常简单的解决方案。基本上,完成某些工作(即 DNS 查找)的任务是完全隔离和可并行的。它在成功或失败后的部分工作是将成功布尔值提交给另一个 固定大小为 1 的 ExecutoService
,它可以执行您想要的任何错误条件检查。
在这种情况下,它只是简单地递增一个带有连续错误的整数,直到达到最大条件,然后设置一个错误条件,工作线程(DNS 查找)首先检查该错误条件以实现快速失败方法,因此所有排队的任务会在遇到错误条件后快速退出。
这最终成为在像这样的多线程场景中跟踪连续错误的非常简单的方法,因为您正在对响应进行单线程检查
我可以想到使用 Java 8 的 CompletableFuture
的更优雅的解决方案,但听起来好像不符合 table
package so.thread.errcondition;
import java.util.Random;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
public class Main {
public static Random rand = new Random();
public static ExecutorService workers = Executors.newFixedThreadPool(5);
// NOTE, this executor has a FIXED size of 1 for in-order processing
public static ExecutorService watcher = Executors.newFixedThreadPool(1);
public static AtomicBoolean errorCondition = new AtomicBoolean(false);
public static AtomicInteger errorCount = new AtomicInteger(0);
public static Integer MAX_ERRORS = 5;
public static void main(String[] args) throws Exception {
int jobs = 1000;
for (int i = 0; i < jobs; i++) {
workers.submit(getWork());
}
Thread.sleep(TimeUnit.SECONDS.toMillis(5));
}
// parallelizable task, the number of parallel workers is irrelevant
public static Runnable getWork() {
return new Runnable() {
@Override
public void run() {
// fail fast
if (errorCondition.get()) {
System.out.println("%%% MAX_ERRORS of [" + MAX_ERRORS + "] occurred, skipping task");
return;
}
// do work
if (rand.nextBoolean()) {
// GOOD JOB
System.out.println("+++ GOOD RESULT");
submitDoneTask(true);
} else {
// ERROR
System.out.println("*** BAD RESULT");
submitDoneTask(false);
}
}
};
}
public static void submitDoneTask(final boolean success) {
watcher.submit(new Runnable() {
@Override
public void run() {
if (!errorCondition.get() && success) {
errorCount.set(0);
} else {
int errors = errorCount.incrementAndGet();
if (errors >= MAX_ERRORS) {
errorCondition.set(true);
}
}
}
});
}
}