如何为 Akka HTTP 配置特定的调度程序?
How to configure specific dispatcher for Akka HTTP?
我有一个简单的 Akka HTTP 应用程序,它使用的是 websocket。我的请求处理程序有阻塞调用(例如 JDBC)。所以,我需要使用一些固定大小的线程池来处理这样的代码。
所以,据我了解,我应该使用 application.conf(像这样 - https://github.com/mkuthan/example-akka-http/blob/master/src/main/resources/application.conf)。但是我不知道如何使用固定的持续线程配置自定义线程池。
当我 运行 我的应用程序进行线程转储时,我看到两个线程名称:
- akka-system-akka.actor.default-dispatcher-62
- 路由-akka.actor.default-dispatcher-79
不明白,这些线程池是干嘛的。
我尝试设置默认线程池,如下所示:
akka {
actor {
default-dispatcher {
type = Dispatcher
executor = "thread-pool-executor"
thread-pool-executor {
fixed-pool-size = 40
}
}
}
}
效果超级奇怪:
- 每个线程池有自己的40个线程,所以我有80个线程。据我了解,每个启动的调度程序都有自己的 40 个线程。不好。
- 它不是 FixedThreadPool - https://docs.oracle.com/javase/7/docs/api/java/util/concurrent/Executors.html#newFixedThreadPool(int)。应用程序启动时。没有任何线程。然后,当应用程序处理了一些请求时,就会产生线程。当一段时间没有传入请求时,线程已经死亡。
您可能使用两个名称 akka-system-akka
和 Routes-akka
启动了两个单独的 ActorSystems,因此您会在日志中看到两种类型的线程名称。
thread-pool-executor
是 ThreadPoolExecutor 由 java 运行time 定义的,它不是 FixedThreadPool。
A ThreadPoolExecutor will automatically adjust the pool size (see
getPoolSize()) according to the bounds set by corePoolSize (see
getCorePoolSize()) and maximumPoolSize (see getMaximumPoolSize()).
When a new task is submitted in method execute(Runnable), and fewer
than corePoolSize threads are running, a new thread is created to
handle the request, even if other worker threads are idle. If there
are more than corePoolSize but less than maximumPoolSize threads
running, a new thread will be created only if the queue is full. By
setting corePoolSize and maximumPoolSize the same, you create a
fixed-size thread pool. By setting maximumPoolSize to an essentially
unbounded value such as Integer.MAX_VALUE, you allow the pool to
accommodate an arbitrary number of concurrent tasks. Most typically,
core and maximum pool sizes are set only upon construction, but they
may also be changed dynamically using setCorePoolSize(int) and
setMaximumPoolSize(int).
因此,您可以看到它以小于定义大小的大小开始,并根据传入任务增加线程数。
阅读解释线程如何终止的 Keep-alive times
部分。
来到手头的问题:
不要覆盖 default-dispatcher
,因为它被 akka 用于其他目的,即将执行的消息传递给所有参与者,这很容易被错误配置。而且,最重要的是,不要 运行 在默认调度程序上阻塞任务(阅读 akka docs 了解更多详细信息)。而是引入一个单独的调度程序并在那里执行您的阻塞代码。
这是一个如何使用普通 http 请求的示例(抱歉,我以前没有使用过网络套接字)
val asyncHandler: HttpRequest => Future[HttpResponse] = { req =>
val blockingExecutionContext = system.dispatchers.lookup("blocking-dispatcher")
Future {
// blocking call
HttpRespone(???)
}(blockingExecutionContext) // this can be passed implicitly too
}
Http().bindAndHandleAsync(asyncHandler, "localhost")
blocking-dispatcher
必须在 application.conf
中配置,类似于您对 default-dispatcher
所做的配置,但它是在配置文件的根目录中定义的。
blocking-dispatcher {
type = Dispatcher
executor = "thread-pool-executor"
thread-pool-executor {
core-pool-size-min = 2
core-pool-size-factor = 2.0
core-pool-size-max = 10
}
}
一般来说,将阻塞执行隔离到单独的执行上下文中,您可以专门为此类阻塞操作配置该执行上下文。由于执行任务的类型由您限制和控制,因此更容易微调这种方法。
我有一个简单的 Akka HTTP 应用程序,它使用的是 websocket。我的请求处理程序有阻塞调用(例如 JDBC)。所以,我需要使用一些固定大小的线程池来处理这样的代码。
所以,据我了解,我应该使用 application.conf(像这样 - https://github.com/mkuthan/example-akka-http/blob/master/src/main/resources/application.conf)。但是我不知道如何使用固定的持续线程配置自定义线程池。
当我 运行 我的应用程序进行线程转储时,我看到两个线程名称:
- akka-system-akka.actor.default-dispatcher-62
- 路由-akka.actor.default-dispatcher-79
不明白,这些线程池是干嘛的。
我尝试设置默认线程池,如下所示:
akka {
actor {
default-dispatcher {
type = Dispatcher
executor = "thread-pool-executor"
thread-pool-executor {
fixed-pool-size = 40
}
}
}
}
效果超级奇怪:
- 每个线程池有自己的40个线程,所以我有80个线程。据我了解,每个启动的调度程序都有自己的 40 个线程。不好。
- 它不是 FixedThreadPool - https://docs.oracle.com/javase/7/docs/api/java/util/concurrent/Executors.html#newFixedThreadPool(int)。应用程序启动时。没有任何线程。然后,当应用程序处理了一些请求时,就会产生线程。当一段时间没有传入请求时,线程已经死亡。
您可能使用两个名称 akka-system-akka
和 Routes-akka
启动了两个单独的 ActorSystems,因此您会在日志中看到两种类型的线程名称。
thread-pool-executor
是 ThreadPoolExecutor 由 java 运行time 定义的,它不是 FixedThreadPool。
A ThreadPoolExecutor will automatically adjust the pool size (see getPoolSize()) according to the bounds set by corePoolSize (see getCorePoolSize()) and maximumPoolSize (see getMaximumPoolSize()). When a new task is submitted in method execute(Runnable), and fewer than corePoolSize threads are running, a new thread is created to handle the request, even if other worker threads are idle. If there are more than corePoolSize but less than maximumPoolSize threads running, a new thread will be created only if the queue is full. By setting corePoolSize and maximumPoolSize the same, you create a fixed-size thread pool. By setting maximumPoolSize to an essentially unbounded value such as Integer.MAX_VALUE, you allow the pool to accommodate an arbitrary number of concurrent tasks. Most typically, core and maximum pool sizes are set only upon construction, but they may also be changed dynamically using setCorePoolSize(int) and setMaximumPoolSize(int).
因此,您可以看到它以小于定义大小的大小开始,并根据传入任务增加线程数。
阅读解释线程如何终止的 Keep-alive times
部分。
来到手头的问题:
不要覆盖 default-dispatcher
,因为它被 akka 用于其他目的,即将执行的消息传递给所有参与者,这很容易被错误配置。而且,最重要的是,不要 运行 在默认调度程序上阻塞任务(阅读 akka docs 了解更多详细信息)。而是引入一个单独的调度程序并在那里执行您的阻塞代码。
这是一个如何使用普通 http 请求的示例(抱歉,我以前没有使用过网络套接字)
val asyncHandler: HttpRequest => Future[HttpResponse] = { req =>
val blockingExecutionContext = system.dispatchers.lookup("blocking-dispatcher")
Future {
// blocking call
HttpRespone(???)
}(blockingExecutionContext) // this can be passed implicitly too
}
Http().bindAndHandleAsync(asyncHandler, "localhost")
blocking-dispatcher
必须在 application.conf
中配置,类似于您对 default-dispatcher
所做的配置,但它是在配置文件的根目录中定义的。
blocking-dispatcher {
type = Dispatcher
executor = "thread-pool-executor"
thread-pool-executor {
core-pool-size-min = 2
core-pool-size-factor = 2.0
core-pool-size-max = 10
}
}
一般来说,将阻塞执行隔离到单独的执行上下文中,您可以专门为此类阻塞操作配置该执行上下文。由于执行任务的类型由您限制和控制,因此更容易微调这种方法。