Java 中的任务执行者 returns 第一个结束任务
Task executor in Java that returns first ending task
我想实现一些可以获取工作人员(可调用对象)集合的东西,运行 它在线程池上并行运行,当最快的工作人员 returns 结果正常关闭时(ExecutorService.shutdownNow) 其他工人,以免浪费更多资源。如果所有工作人员都以异常结束,我需要重新抛出最重要的异常(工作人员抛出的所有异常都与 importance
值相关联)。此外,我需要在整个执行程序上设置一个超时,如果他们 运行 时间太长,这将终止所有工作人员。
我考虑过为此使用 RxJava,因为感觉在这里可以实现一个简洁而漂亮的解决方案。但也许您可以为此想出一些更好的工具(CompletableFutures、ForkJoinTasks?)。这是我已经编写的代码,但它远不是一个有效的解决方案(我对反应式编程没有真正的经验,因此真的很挣扎):
public T run(Collection<? extends Worker<T>> workers, long timeout) {
ExecutorService executorService = Executors.newFixedThreadPool(workers.size());
return Observable.from(workers)
.timeout(timeout, TimeUnit.MILLISECONDS)
.subscribeOn(Schedulers.from(executorService))
.map(worker -> {
try {
T res = worker.call();
executorService.shutdownNow();
return res;
} catch (Exception e) {
throw Exceptions.propagate(e);
}
}).doOnError(Exceptions::propagate).toBlocking().first();
如有任何帮助,我将不胜感激。
非常有趣的技术挑战,感谢您的提问。这是使用 CompletableFuture
代替 Java8 的解决方案。在 Java7 中,您可以以完全相同的方式使用 io.netty.util.concurrent.Promise
。
最简单的部分是处理正常情况:
- 创造一个完整的未来
- 安排任务
- return未来
- 第一个完成的完成未来,其他的被忽略(如果没有被杀死那么原子布尔控制它们不会覆盖该值)
- 未来下一阶段关闭executor服务
更棘手的部分是当每个单独的抛出保持相同的逻辑流程时异常完成。这是通过累积所有异常并在计数达到最后一个失败作业中的作业计数时异常完成未来来解决的。传递的异常是按排名排序的列表中的第一个(这里将是最小的排名,相应地改变)。异常将在调用 future.get()
时可用并包装到 ExecutionException
.
最后,因为你得到了未来,你可以将超时值传递给 get
方法。
所以这是实际可行的解决方案,异常 class 和测试如下:
public <R> CompletableFuture<R> execute(Collection<? extends Callable<R>> jobs) {
final CompletableFuture<R> result = new CompletableFuture<>();
if (jobs == null || jobs.isEmpty()) {
result.completeExceptionally(new IllegalArgumentException("there must be at least one job"));
return result;
}
final ExecutorService service = Executors.newFixedThreadPool(jobs.size());
// accumulate all exceptions to rank later (only if all throw)
final List<RankedException> exceptions = Collections.synchronizedList(Lists.newArrayList());
final AtomicBoolean done = new AtomicBoolean(false);
for (Callable<R> job: jobs) {
service.execute(() -> {
try {
// this is where the actual work is done
R res = job.call();
// set result if still unset
if (done.compareAndSet(false, true)) {
// complete the future, move to service shutdown
result.complete(res);
}
// beware of catching Exception, change to your own checked type
} catch (Exception ex) {
if (ex instanceof RankedException) {
exceptions.add((RankedException) ex);
} else {
exceptions.add(new RankedException(ex));
}
if (exceptions.size() >= jobs.size()) {
// the last to throw and only if all have thrown will run:
Collections.sort(exceptions, (left, right) -> Integer.compare(left.rank, right.rank));
// complete the future, move to service shutdown
result.completeExceptionally(exceptions.get(0));
}
}
});
}
// shutdown also on error, do not wait for this stage
result.whenCompleteAsync((action, t) -> service.shutdownNow());
return result;
}
RankedExeption
:
public static class RankedException extends Exception {
private final int rank;
public RankedException(Throwable t) {
this(0, t);
}
public RankedException(int rank, Throwable t) {
super(t);
this.rank = rank;
}
}
现在有两个测试,成功案例和失败案例(有点简化,但仍然如此):
@Rule
public ExpectedException exception = ExpectedException.none();
private static class TestJob implements Callable<Double> {
private final int index;
private final int failOnCount;
TestJob(int index, int failOnCount) {
this.index = index;
this.failOnCount = failOnCount;
}
@Override
public Double call() throws RankedException {
double res = 0;
int count = (int) (Math.random() * 1e6) + 1;
if (count > failOnCount) {
throw new RankedException(count, new RuntimeException("job " + index + " failed"));
}
for (int i = 0; i < count; i++) {
res += Math.random();
}
return res;
}
}
@Test
public void test_success() throws Exception {
List<TestJob> jobs = Lists.newArrayList();
for (int i = 0; i < 10; i++) {
jobs.add(new TestJob(i, (int)(5*1e5))); // 50% should be alright
}
CompletableFuture<Double> res = execute(jobs);
logger.info("SUCCESS-TEST completed with " + res.get(30, TimeUnit.SECONDS));
}
@Test
public void test_failure() throws Exception {
List<TestJob> jobs = Lists.newArrayList();
for (int i = 0; i < 10; i++) {
jobs.add(new TestJob(i, 0)); // all should fail
}
CompletableFuture<Double> res = execute(jobs);
exception.expect(ExecutionException.class);
try {
res.get(30, TimeUnit.SECONDS);
} catch (ExecutionException ex) {
logger.severe(String.format("FAIL-TEST rank: %s", ((RankedException) ex.getCause()).rank));
throw ex;
}
}
测试运行的最终截断输出:
INFO: SUCCESS-TEST completed with 115863.20802680103
SEVERE: FAIL-TEST rank: 388150
Process finished with exit code 0
注意:您可能希望通过 AtomicBoolean
进一步发出信号,以便在第一个线程就绪时真正向所有线程发出终止信号
我不保证上面的代码没有错误,因为它是匆忙完成的并且测试是基本的。意在指明进一步挖掘的方向。
您是否研究过 RxJava AMB 运算符?但是,您需要验证它是否在第一个 onComplete 时完成,因为文档对此没有任何说明。
RxJava 的完美应用。要获得并行操作,请在 flatMap
中使用 flatMap
和 subscribeOn
。要拾取错误,请使用 materialize
,并在成功返回值后立即停止,请使用 takeUntil
。使用 timeout
运算符满足您的超时要求。
ExecutorService executorService =
Executors.newFixedThreadPool(workers.size());
Scheduler scheduler = Schedulers.from(executorService);
return Observable
.from(workers)
.flatMap(worker ->
Observable.fromCallable(worker)
.subscribeOn(scheduler)
.materialize())
.takeUntil(notification -> notification.hasValue())
.toList()
.timeout(30, TimeUnit.SECONDS)
.flatMap(
list -> {
Notification<T> last = list.get(list.size() - 1);
if (last.hasValue())
return Observable.just(last.getValue());
else {
// TODO get the error notification from the list
// with the highest importance and emit
return Observable.<T>error(err);
}
}).subscribe(subscriber);
我想实现一些可以获取工作人员(可调用对象)集合的东西,运行 它在线程池上并行运行,当最快的工作人员 returns 结果正常关闭时(ExecutorService.shutdownNow) 其他工人,以免浪费更多资源。如果所有工作人员都以异常结束,我需要重新抛出最重要的异常(工作人员抛出的所有异常都与 importance
值相关联)。此外,我需要在整个执行程序上设置一个超时,如果他们 运行 时间太长,这将终止所有工作人员。
我考虑过为此使用 RxJava,因为感觉在这里可以实现一个简洁而漂亮的解决方案。但也许您可以为此想出一些更好的工具(CompletableFutures、ForkJoinTasks?)。这是我已经编写的代码,但它远不是一个有效的解决方案(我对反应式编程没有真正的经验,因此真的很挣扎):
public T run(Collection<? extends Worker<T>> workers, long timeout) {
ExecutorService executorService = Executors.newFixedThreadPool(workers.size());
return Observable.from(workers)
.timeout(timeout, TimeUnit.MILLISECONDS)
.subscribeOn(Schedulers.from(executorService))
.map(worker -> {
try {
T res = worker.call();
executorService.shutdownNow();
return res;
} catch (Exception e) {
throw Exceptions.propagate(e);
}
}).doOnError(Exceptions::propagate).toBlocking().first();
如有任何帮助,我将不胜感激。
非常有趣的技术挑战,感谢您的提问。这是使用 CompletableFuture
代替 Java8 的解决方案。在 Java7 中,您可以以完全相同的方式使用 io.netty.util.concurrent.Promise
。
最简单的部分是处理正常情况:
- 创造一个完整的未来
- 安排任务
- return未来
- 第一个完成的完成未来,其他的被忽略(如果没有被杀死那么原子布尔控制它们不会覆盖该值)
- 未来下一阶段关闭executor服务
更棘手的部分是当每个单独的抛出保持相同的逻辑流程时异常完成。这是通过累积所有异常并在计数达到最后一个失败作业中的作业计数时异常完成未来来解决的。传递的异常是按排名排序的列表中的第一个(这里将是最小的排名,相应地改变)。异常将在调用 future.get()
时可用并包装到 ExecutionException
.
最后,因为你得到了未来,你可以将超时值传递给 get
方法。
所以这是实际可行的解决方案,异常 class 和测试如下:
public <R> CompletableFuture<R> execute(Collection<? extends Callable<R>> jobs) {
final CompletableFuture<R> result = new CompletableFuture<>();
if (jobs == null || jobs.isEmpty()) {
result.completeExceptionally(new IllegalArgumentException("there must be at least one job"));
return result;
}
final ExecutorService service = Executors.newFixedThreadPool(jobs.size());
// accumulate all exceptions to rank later (only if all throw)
final List<RankedException> exceptions = Collections.synchronizedList(Lists.newArrayList());
final AtomicBoolean done = new AtomicBoolean(false);
for (Callable<R> job: jobs) {
service.execute(() -> {
try {
// this is where the actual work is done
R res = job.call();
// set result if still unset
if (done.compareAndSet(false, true)) {
// complete the future, move to service shutdown
result.complete(res);
}
// beware of catching Exception, change to your own checked type
} catch (Exception ex) {
if (ex instanceof RankedException) {
exceptions.add((RankedException) ex);
} else {
exceptions.add(new RankedException(ex));
}
if (exceptions.size() >= jobs.size()) {
// the last to throw and only if all have thrown will run:
Collections.sort(exceptions, (left, right) -> Integer.compare(left.rank, right.rank));
// complete the future, move to service shutdown
result.completeExceptionally(exceptions.get(0));
}
}
});
}
// shutdown also on error, do not wait for this stage
result.whenCompleteAsync((action, t) -> service.shutdownNow());
return result;
}
RankedExeption
:
public static class RankedException extends Exception {
private final int rank;
public RankedException(Throwable t) {
this(0, t);
}
public RankedException(int rank, Throwable t) {
super(t);
this.rank = rank;
}
}
现在有两个测试,成功案例和失败案例(有点简化,但仍然如此):
@Rule
public ExpectedException exception = ExpectedException.none();
private static class TestJob implements Callable<Double> {
private final int index;
private final int failOnCount;
TestJob(int index, int failOnCount) {
this.index = index;
this.failOnCount = failOnCount;
}
@Override
public Double call() throws RankedException {
double res = 0;
int count = (int) (Math.random() * 1e6) + 1;
if (count > failOnCount) {
throw new RankedException(count, new RuntimeException("job " + index + " failed"));
}
for (int i = 0; i < count; i++) {
res += Math.random();
}
return res;
}
}
@Test
public void test_success() throws Exception {
List<TestJob> jobs = Lists.newArrayList();
for (int i = 0; i < 10; i++) {
jobs.add(new TestJob(i, (int)(5*1e5))); // 50% should be alright
}
CompletableFuture<Double> res = execute(jobs);
logger.info("SUCCESS-TEST completed with " + res.get(30, TimeUnit.SECONDS));
}
@Test
public void test_failure() throws Exception {
List<TestJob> jobs = Lists.newArrayList();
for (int i = 0; i < 10; i++) {
jobs.add(new TestJob(i, 0)); // all should fail
}
CompletableFuture<Double> res = execute(jobs);
exception.expect(ExecutionException.class);
try {
res.get(30, TimeUnit.SECONDS);
} catch (ExecutionException ex) {
logger.severe(String.format("FAIL-TEST rank: %s", ((RankedException) ex.getCause()).rank));
throw ex;
}
}
测试运行的最终截断输出:
INFO: SUCCESS-TEST completed with 115863.20802680103
SEVERE: FAIL-TEST rank: 388150
Process finished with exit code 0
注意:您可能希望通过 AtomicBoolean
进一步发出信号,以便在第一个线程就绪时真正向所有线程发出终止信号
我不保证上面的代码没有错误,因为它是匆忙完成的并且测试是基本的。意在指明进一步挖掘的方向。
您是否研究过 RxJava AMB 运算符?但是,您需要验证它是否在第一个 onComplete 时完成,因为文档对此没有任何说明。
RxJava 的完美应用。要获得并行操作,请在 flatMap
中使用 flatMap
和 subscribeOn
。要拾取错误,请使用 materialize
,并在成功返回值后立即停止,请使用 takeUntil
。使用 timeout
运算符满足您的超时要求。
ExecutorService executorService =
Executors.newFixedThreadPool(workers.size());
Scheduler scheduler = Schedulers.from(executorService);
return Observable
.from(workers)
.flatMap(worker ->
Observable.fromCallable(worker)
.subscribeOn(scheduler)
.materialize())
.takeUntil(notification -> notification.hasValue())
.toList()
.timeout(30, TimeUnit.SECONDS)
.flatMap(
list -> {
Notification<T> last = list.get(list.size() - 1);
if (last.hasValue())
return Observable.just(last.getValue());
else {
// TODO get the error notification from the list
// with the highest importance and emit
return Observable.<T>error(err);
}
}).subscribe(subscriber);