用于异步处理的短暂 ExecutorService

Short lived ExecutorService for async processing

我有一个方法可以执行异步请求并忘记时尚。

方法实现如下:

private void publishWorkItem(final Object payload, final ZkWorkCompleteCallback callback)
{
    if (payload == null)
        throw new NullPointerException();

    final ExecutorService executor = Executors.newSingleThreadExecutor(PUBLISH_WORK_THREAD_FACTORY);

    try
    {
        executor.execute(() -> {

            try
            {
                if (callback != null)
                {
                    final ZkWorkItem retval = publishWorkItem(payload);
                    callback.onCompleted(retval);
                }
            }
            catch (final InterruptedException e)
            {
                // suppressed
            }
            catch (final Exception e)
            {
                LOGGER.error("Unhandled exception", e);

                if (callback != null)
                    callback.onError(e);
            }
        });
    }
    finally
    {
        executor.shutdown();
    }
}

问题是我正在为每个异步请求创建新的 ExecutorService Executors.newSingleThreadExecutor,而不是使用固定线程池。原因是 publishWorkItem(payload) 方法使用了 CountDownLatch#await() ,这反过来会阻塞正在执行的线程,因为它等待 Watcher 完成。这可能会很快耗尽固定大小的池。

publishWorkItem(payload)

的简化代码
  final CountDownLatch latch = new CountDownLatch(1);

        zkClient.exists(pathToWatch, new Watcher()
        {
            @Override
            public void process(final WatchedEvent event)
            {
                try
                {
                    extractAndDelete(baos, event.getPath());
                }
                catch (final Exception e)
                {
                    LOGGER.error("Unable to perform cleanup", e);
                }
                finally
                {
                    latch.countDown();
                }
            }
        }, true);

       ------ THIS IS THE PROBLEM (Blocks current thread) ------ 
       latch.await(); 

所以我的问题是:是否有更好的方法来解决此类问题。

我对应用程序进行了分析,但没有发现任何性能问题,我担心的是它创建了大量线程。

你为什么不使用 ExecutorService.newCachedThreadPool()

根据 javadoc,它适合您的用例

These pools will typically improve the performance of programs that execute many short-lived asynchronous tasks ... will reuse previously constructed threads if available

您不必在每次调用 publishWorkItem() 时都创建一个新的单线程池,而是创建一次缓存线程池并用于所有查询。线程数上限为 Integer.MAX_VALUE,因此您不会像固定线程池那样受到限制,但总体上应该创建更少的线程。