如何为 Akka HTTP 配置特定的调度程序?

How to configure specific dispatcher for Akka HTTP?

我有一个简单的 Akka HTTP 应用程序,它使用的是 websocket。我的请求处理程序有阻塞调用(例如 JDBC)。所以,我需要使用一些固定大小的线程池来处理这样的代码。

所以,据我了解,我应该使用 application.conf(像这样 - https://github.com/mkuthan/example-akka-http/blob/master/src/main/resources/application.conf)。但是我不知道如何使用固定的持续线程配置自定义线程池。

当我 运行 我的应用程序进行线程转储时,我看到两个线程名称:

不明白,这些线程池是干嘛的。

我尝试设置默认线程池,如下所示:

akka {
  actor {
    default-dispatcher {
      type = Dispatcher
      executor = "thread-pool-executor"
      thread-pool-executor {
        fixed-pool-size = 40
      }
    }
  }
}

效果超级奇怪:

您可能使用两个名称 akka-system-akkaRoutes-akka 启动了两个单独的 ActorSystems,因此您会在日志中看到两种类型的线程名称。

thread-pool-executorThreadPoolExecutor 由 java 运行time 定义的,它不是 FixedThreadPool。

A ThreadPoolExecutor will automatically adjust the pool size (see getPoolSize()) according to the bounds set by corePoolSize (see getCorePoolSize()) and maximumPoolSize (see getMaximumPoolSize()). When a new task is submitted in method execute(Runnable), and fewer than corePoolSize threads are running, a new thread is created to handle the request, even if other worker threads are idle. If there are more than corePoolSize but less than maximumPoolSize threads running, a new thread will be created only if the queue is full. By setting corePoolSize and maximumPoolSize the same, you create a fixed-size thread pool. By setting maximumPoolSize to an essentially unbounded value such as Integer.MAX_VALUE, you allow the pool to accommodate an arbitrary number of concurrent tasks. Most typically, core and maximum pool sizes are set only upon construction, but they may also be changed dynamically using setCorePoolSize(int) and setMaximumPoolSize(int).

因此,您可以看到它以小于定义大小的大小开始,并根据传入任务增加线程数。

阅读解释线程如何终止的 Keep-alive times 部分。

来到手头的问题:

不要覆盖 default-dispatcher,因为它被 akka 用于其他目的,即将执行的消息传递给所有参与者,这很容易被错误配置。而且,最重要的是,不要 运行 在默认调度程序上阻塞任务(阅读 akka docs 了解更多详细信息)。而是引入一个单独的调度程序并在那里执行您的阻塞代码。

这是一个如何使用普通 http 请求的示例(抱歉,我以前没有使用过网络套接字)

  val asyncHandler: HttpRequest => Future[HttpResponse] = { req =>
    val blockingExecutionContext = system.dispatchers.lookup("blocking-dispatcher")
    Future {
      // blocking call 
      HttpRespone(???)
    }(blockingExecutionContext) // this can be passed implicitly too
  }
  Http().bindAndHandleAsync(asyncHandler, "localhost")

blocking-dispatcher 必须在 application.conf 中配置,类似于您对 default-dispatcher 所做的配置,但它是在配置文件的根目录中定义的。

blocking-dispatcher {
  type = Dispatcher
  executor = "thread-pool-executor"
  thread-pool-executor {
    core-pool-size-min = 2
    core-pool-size-factor = 2.0
    core-pool-size-max = 10
  }
}

一般来说,将阻塞执行隔离到单独的执行上下文中,您可以专门为此类阻塞操作配置该执行上下文。由于执行任务的类型由您限制和控制,因此更容易微调这种方法。