终止 ExecutorService 可完成的未来问题
Completable future problem with terminating ExecutorService
做一些基本的网络抓取工具。
我想要 运行 使用 Completable future 与多个线程并行抓取。每个作业检索需要抓取的 Page 对象和 returns 包含已创建 urls 列表的 Page 对象。
列表中的每个 url 如果尚未提交以供抓取,则会开始新作业。完成所有并行作业后,我想继续逻辑。
如果我删除“allFutures.thenRun(() -> executorService.shutdown());”,此代码仅抓取第一页对象然后终止的问题然后它收集所有 pages/urls 但程序永远不会结束。
public class Demo
{
private final Set<Page> pages = new HashSet<>();
private final Set<Page> submittedPages = new HashSet<>();
private final ExecutorService executorService;
public Demo(final int numberOfThreads)
{
this.executorService = Executors.newFixedThreadPool(numberOfThreads);
}
public void start(String url) throws ExecutionException, InterruptedException
{
this.submitTask(new Page(url));
CompletableFuture<Void> allFutures = CompletableFuture.allOf(completableFutureList.toArray(new CompletableFuture[completableFutureList.size()]));
allFutures.thenRun(() -> executorService.shutdown());
// do something with pages
}
private void submitTask(final Page page)
{
if (!this.submittedPages.contains(page))
{
this.submittedPages.add(page);
CompletableFuture<Void> cf = CompletableFuture.supplyAsync(() -> new Task(page).call(), this.executorService) //want to run this parallel in multiple threads
.thenAccept(receivedPage -> {
this.savePage(receivedPage);
this.submitCollectedLinks(receivedPage);
});
completableFutureList.add(cf);
}
}
private void submitCollectedLinks(final Page page){
page.getLinks()
.stream()
.map(Page::new)
.forEach(this::submitTask);
}
private void savePage(final Page page)
{
this.pages.add(page);
}
}
您的代码存在几个问题。您计划在 completableFutureList
的快照完成时关闭执行程序服务,稍后可能会添加更多期货,但更糟糕的是,您达到了 // do something with pages
点,甚至没有快照尚未完成。
您没有显示 completableFutureList
的声明,但鉴于您从不同线程修改的 pages
和 submittedPages
已使用 HashSet
初始化,这不是线程安全的,我对这个列表也没有什么好感。但无论如何你都不需要这个列表。您应该将提交代码更改为 return futures,表示待处理任务正在 与后续任务 组合。传递给 thenCompose
的函数将在先决条件阶段完成后进行评估,换句话说,这允许在链接函数时依赖于未知的未来。
请注意,用线程安全结构替换 HashSet
是不够的。您必须避免在 add
之前调用 contains
之类的序列,因为无法保证在这两个调用之间没有其他线程会执行 add
(称为“check-then-act”反-图案)。你可以只使用 add
,它什么都不做,当元素已经存在时,你可以使用 return false
。使用正确的线程安全 Set
实现,它提供了所需的原子性。
把这些东西放在一起,你会得到,例如
public class Demo {
private final Set<Page> pages = ConcurrentHashMap.newKeySet();
private final Set<Page> submittedPages = ConcurrentHashMap.newKeySet();
private final ExecutorService executorService;
public Demo(final int numberOfThreads) {
this.executorService = Executors.newFixedThreadPool(numberOfThreads);
}
public void start(String url) {
this.submitTask(new Page(url))
// shutdown even in the exceptional case
.whenComplete((_void, throwable) -> executorService.shutdown())
.join(); // wait for completion before doing something with pages
// do something with pages
}
private CompletableFuture<Void> submitTask(final Page page) {
// use a single add to avoid check-then-act anti-pattern
if(this.submittedPages.add(page)) {
return CompletableFuture.supplyAsync(new Task(page)::call, executorService)
// compose with recursively encountered tasks
.thenCompose(receivedPage -> {
this.savePage(receivedPage);
return this.submitCollectedLinks(receivedPage);
});
}
// do nothing when already submitted
return CompletableFuture.completedFuture(null);
}
private CompletableFuture<Void> submitCollectedLinks(final Page page) {
return CompletableFuture.allOf(page.getLinks()
.stream().map(Page::new).map(this::submitTask)
.toArray(CompletableFuture<?>[]::new));
}
private void savePage(final Page page) {
this.pages.add(page);
}
}
做一些基本的网络抓取工具。
我想要 运行 使用 Completable future 与多个线程并行抓取。每个作业检索需要抓取的 Page 对象和 returns 包含已创建 urls 列表的 Page 对象。
列表中的每个 url 如果尚未提交以供抓取,则会开始新作业。完成所有并行作业后,我想继续逻辑。
如果我删除“allFutures.thenRun(() -> executorService.shutdown());”,此代码仅抓取第一页对象然后终止的问题然后它收集所有 pages/urls 但程序永远不会结束。
public class Demo
{
private final Set<Page> pages = new HashSet<>();
private final Set<Page> submittedPages = new HashSet<>();
private final ExecutorService executorService;
public Demo(final int numberOfThreads)
{
this.executorService = Executors.newFixedThreadPool(numberOfThreads);
}
public void start(String url) throws ExecutionException, InterruptedException
{
this.submitTask(new Page(url));
CompletableFuture<Void> allFutures = CompletableFuture.allOf(completableFutureList.toArray(new CompletableFuture[completableFutureList.size()]));
allFutures.thenRun(() -> executorService.shutdown());
// do something with pages
}
private void submitTask(final Page page)
{
if (!this.submittedPages.contains(page))
{
this.submittedPages.add(page);
CompletableFuture<Void> cf = CompletableFuture.supplyAsync(() -> new Task(page).call(), this.executorService) //want to run this parallel in multiple threads
.thenAccept(receivedPage -> {
this.savePage(receivedPage);
this.submitCollectedLinks(receivedPage);
});
completableFutureList.add(cf);
}
}
private void submitCollectedLinks(final Page page){
page.getLinks()
.stream()
.map(Page::new)
.forEach(this::submitTask);
}
private void savePage(final Page page)
{
this.pages.add(page);
}
}
您的代码存在几个问题。您计划在 completableFutureList
的快照完成时关闭执行程序服务,稍后可能会添加更多期货,但更糟糕的是,您达到了 // do something with pages
点,甚至没有快照尚未完成。
您没有显示 completableFutureList
的声明,但鉴于您从不同线程修改的 pages
和 submittedPages
已使用 HashSet
初始化,这不是线程安全的,我对这个列表也没有什么好感。但无论如何你都不需要这个列表。您应该将提交代码更改为 return futures,表示待处理任务正在 与后续任务 组合。传递给 thenCompose
的函数将在先决条件阶段完成后进行评估,换句话说,这允许在链接函数时依赖于未知的未来。
请注意,用线程安全结构替换 HashSet
是不够的。您必须避免在 add
之前调用 contains
之类的序列,因为无法保证在这两个调用之间没有其他线程会执行 add
(称为“check-then-act”反-图案)。你可以只使用 add
,它什么都不做,当元素已经存在时,你可以使用 return false
。使用正确的线程安全 Set
实现,它提供了所需的原子性。
把这些东西放在一起,你会得到,例如
public class Demo {
private final Set<Page> pages = ConcurrentHashMap.newKeySet();
private final Set<Page> submittedPages = ConcurrentHashMap.newKeySet();
private final ExecutorService executorService;
public Demo(final int numberOfThreads) {
this.executorService = Executors.newFixedThreadPool(numberOfThreads);
}
public void start(String url) {
this.submitTask(new Page(url))
// shutdown even in the exceptional case
.whenComplete((_void, throwable) -> executorService.shutdown())
.join(); // wait for completion before doing something with pages
// do something with pages
}
private CompletableFuture<Void> submitTask(final Page page) {
// use a single add to avoid check-then-act anti-pattern
if(this.submittedPages.add(page)) {
return CompletableFuture.supplyAsync(new Task(page)::call, executorService)
// compose with recursively encountered tasks
.thenCompose(receivedPage -> {
this.savePage(receivedPage);
return this.submitCollectedLinks(receivedPage);
});
}
// do nothing when already submitted
return CompletableFuture.completedFuture(null);
}
private CompletableFuture<Void> submitCollectedLinks(final Page page) {
return CompletableFuture.allOf(page.getLinks()
.stream().map(Page::new).map(this::submitTask)
.toArray(CompletableFuture<?>[]::new));
}
private void savePage(final Page page) {
this.pages.add(page);
}
}