用于异步处理的短暂 ExecutorService
Short lived ExecutorService for async processing
我有一个方法可以执行异步请求并忘记时尚。
方法实现如下:
private void publishWorkItem(final Object payload, final ZkWorkCompleteCallback callback)
{
if (payload == null)
throw new NullPointerException();
final ExecutorService executor = Executors.newSingleThreadExecutor(PUBLISH_WORK_THREAD_FACTORY);
try
{
executor.execute(() -> {
try
{
if (callback != null)
{
final ZkWorkItem retval = publishWorkItem(payload);
callback.onCompleted(retval);
}
}
catch (final InterruptedException e)
{
// suppressed
}
catch (final Exception e)
{
LOGGER.error("Unhandled exception", e);
if (callback != null)
callback.onError(e);
}
});
}
finally
{
executor.shutdown();
}
}
问题是我正在为每个异步请求创建新的 ExecutorService Executors.newSingleThreadExecutor
,而不是使用固定线程池。原因是 publishWorkItem(payload)
方法使用了 CountDownLatch#await()
,这反过来会阻塞正在执行的线程,因为它等待 Watcher
完成。这可能会很快耗尽固定大小的池。
publishWorkItem(payload)
的简化代码
final CountDownLatch latch = new CountDownLatch(1);
zkClient.exists(pathToWatch, new Watcher()
{
@Override
public void process(final WatchedEvent event)
{
try
{
extractAndDelete(baos, event.getPath());
}
catch (final Exception e)
{
LOGGER.error("Unable to perform cleanup", e);
}
finally
{
latch.countDown();
}
}
}, true);
------ THIS IS THE PROBLEM (Blocks current thread) ------
latch.await();
所以我的问题是:是否有更好的方法来解决此类问题。
我对应用程序进行了分析,但没有发现任何性能问题,我担心的是它创建了大量线程。
你为什么不使用 ExecutorService.newCachedThreadPool()
?
根据 javadoc,它适合您的用例
These pools will typically improve the performance of programs that execute many short-lived asynchronous tasks ... will reuse previously constructed threads if available
您不必在每次调用 publishWorkItem()
时都创建一个新的单线程池,而是创建一次缓存线程池并用于所有查询。线程数上限为 Integer.MAX_VALUE
,因此您不会像固定线程池那样受到限制,但总体上应该创建更少的线程。
我有一个方法可以执行异步请求并忘记时尚。
方法实现如下:
private void publishWorkItem(final Object payload, final ZkWorkCompleteCallback callback)
{
if (payload == null)
throw new NullPointerException();
final ExecutorService executor = Executors.newSingleThreadExecutor(PUBLISH_WORK_THREAD_FACTORY);
try
{
executor.execute(() -> {
try
{
if (callback != null)
{
final ZkWorkItem retval = publishWorkItem(payload);
callback.onCompleted(retval);
}
}
catch (final InterruptedException e)
{
// suppressed
}
catch (final Exception e)
{
LOGGER.error("Unhandled exception", e);
if (callback != null)
callback.onError(e);
}
});
}
finally
{
executor.shutdown();
}
}
问题是我正在为每个异步请求创建新的 ExecutorService Executors.newSingleThreadExecutor
,而不是使用固定线程池。原因是 publishWorkItem(payload)
方法使用了 CountDownLatch#await()
,这反过来会阻塞正在执行的线程,因为它等待 Watcher
完成。这可能会很快耗尽固定大小的池。
publishWorkItem(payload)
final CountDownLatch latch = new CountDownLatch(1);
zkClient.exists(pathToWatch, new Watcher()
{
@Override
public void process(final WatchedEvent event)
{
try
{
extractAndDelete(baos, event.getPath());
}
catch (final Exception e)
{
LOGGER.error("Unable to perform cleanup", e);
}
finally
{
latch.countDown();
}
}
}, true);
------ THIS IS THE PROBLEM (Blocks current thread) ------
latch.await();
所以我的问题是:是否有更好的方法来解决此类问题。
我对应用程序进行了分析,但没有发现任何性能问题,我担心的是它创建了大量线程。
你为什么不使用 ExecutorService.newCachedThreadPool()
?
根据 javadoc,它适合您的用例
These pools will typically improve the performance of programs that execute many short-lived asynchronous tasks ... will reuse previously constructed threads if available
您不必在每次调用 publishWorkItem()
时都创建一个新的单线程池,而是创建一次缓存线程池并用于所有查询。线程数上限为 Integer.MAX_VALUE
,因此您不会像固定线程池那样受到限制,但总体上应该创建更少的线程。