在不传播的情况下访问对象中的 ApplicationCall
Access ApplicationCall in object without propagation
Ktor有线程安全的方法可以静态访问当前的ApplicationCall吗?我正在尝试使以下简单示例起作用;
object Main {
fun start() {
val server = embeddedServer(Jetty, 8081) {
intercept(ApplicationCallPipeline.Call) {
// START: this will be more dynamic in the future, we don't want to pass ApplicationCall
Addon.processRequest()
// END: this will be more dynamic in the future, we don't want to pass ApplicationCall
call.respondText(output, ContentType.Text.Html, HttpStatusCode.OK)
return@intercept finish()
}
}
server.start(wait = true)
}
}
fun main(args: Array<String>) {
Main.start();
}
object Addon {
fun processRequest() {
val call = RequestUtils.getCurrentApplicationCall()
// processing of call.request.queryParameters
// ...
}
}
object RequestUtils {
fun getCurrentApplicationCall(): ApplicationCall {
// Here is where I am getting lost..
return null
}
}
我希望能够从 RequestUtils 静态获取当前上下文的 ApplicationCall,以便我可以在任何地方访问有关请求的信息。这当然需要扩展才能同时处理多个请求。
我用依赖注入和 ThreadLocal 做了一些实验,但没有成功。
好吧,应用程序调用被传递给协程,因此尝试获取它真的很危险"statically",因为所有请求都在并发上下文中处理。
Kotlin 官方文档谈到Thread-local in the context of coroutine executions。它使用 CoroutineContext 的概念来恢复 specific/custom 协程上下文中的线程局部值。
但是,如果您能够设计一个完全异步的 API,您将能够通过直接创建一个自定义的 CoroutineContext,嵌入请求调用来绕过线程本地。
编辑:我更新了我的示例代码以测试 2 种口味:
- 异步端点:完全基于协程上下文和挂起函数的解决方案
- blocking 端点:使用线程本地存储应用程序调用,如 kotlin doc.
中所述
import io.ktor.server.engine.embeddedServer
import io.ktor.server.jetty.Jetty
import io.ktor.application.*
import io.ktor.http.ContentType
import io.ktor.http.HttpStatusCode
import io.ktor.response.respondText
import io.ktor.routing.get
import io.ktor.routing.routing
import kotlinx.coroutines.asContextElement
import kotlinx.coroutines.launch
import kotlin.coroutines.AbstractCoroutineContextElement
import kotlin.coroutines.CoroutineContext
import kotlin.coroutines.coroutineContext
/**
* Thread local in which you'll inject application call.
*/
private val localCall : ThreadLocal<ApplicationCall> = ThreadLocal();
object Main {
fun start() {
val server = embeddedServer(Jetty, 8081) {
routing {
// Solution requiring full coroutine/ supendable execution.
get("/async") {
// Ktor will launch this block of code in a coroutine, so you can create a subroutine with
// an overloaded context providing needed information.
launch(coroutineContext + ApplicationCallContext(call)) {
PrintQuery.processAsync()
}
}
// Solution based on Thread-Local, not requiring suspending functions
get("/blocking") {
launch (coroutineContext + localCall.asContextElement(value = call)) {
PrintQuery.processBlocking()
}
}
}
intercept(ApplicationCallPipeline.ApplicationPhase.Call) {
call.respondText("Hé ho", ContentType.Text.Plain, HttpStatusCode.OK)
}
}
server.start(wait = true)
}
}
fun main() {
Main.start();
}
interface AsyncAddon {
/**
* Asynchronicity propagates in order to properly access coroutine execution information
*/
suspend fun processAsync();
}
interface BlockingAddon {
fun processBlocking();
}
object PrintQuery : AsyncAddon, BlockingAddon {
override suspend fun processAsync() = processRequest("async", fetchCurrentCallFromCoroutineContext())
override fun processBlocking() = processRequest("blocking", fetchCurrentCallFromThreadLocal())
private fun processRequest(prefix : String, call : ApplicationCall?) {
println("$prefix -> Query parameter: ${call?.parameters?.get("q") ?: "NONE"}")
}
}
/**
* Custom coroutine context allow to provide information about request execution.
*/
private class ApplicationCallContext(val call : ApplicationCall) : AbstractCoroutineContextElement(Key) {
companion object Key : CoroutineContext.Key<ApplicationCallContext>
}
/**
* This is your RequestUtils rewritten as a first-order function. It defines as asynchronous.
* If not, you won't be able to access coroutineContext.
*/
suspend fun fetchCurrentCallFromCoroutineContext(): ApplicationCall? {
// Here is where I am getting lost..
return coroutineContext.get(ApplicationCallContext.Key)?.call
}
fun fetchCurrentCallFromThreadLocal() : ApplicationCall? {
return localCall.get()
}
您可以在导航器中进行测试:
http://localhost:8081/blocking?q=test1
http://localhost:8081/blocking?q=test2
http://localhost:8081/async?q=test3
服务器日志输出:
blocking -> Query parameter: test1
blocking -> Query parameter: test2
async -> Query parameter: test3
您要为此使用的关键机制是 CoroutineContext
。这是您可以设置要在任何子协程或挂起函数调用中使用的键值对的地方。
我会试着举个例子。
首先,让我们定义一个 CoroutineContextElement
,它可以让我们将 ApplicationCall
添加到 CoroutineContext
。
class ApplicationCallElement(var call: ApplicationCall?) : AbstractCoroutineContextElement(ApplicationCallElement) {
companion object Key : CoroutineContext.Key<ApplicationCallElement>
}
现在我们可以定义一些助手,它们将在我们的其中一条路线上添加 ApplicationCall
。 (这可以作为某种监听管道的 Ktor 插件来完成,但我不想在这里增加太多噪音)。
suspend fun PipelineContext<Unit, ApplicationCall>.withCall(
bodyOfCall: suspend PipelineContext<Unit, ApplicationCall>.() -> Unit
) {
val pipeline = this
val appCallContext = buildAppCallContext(this.call)
withContext(appCallContext) {
pipeline.bodyOfCall()
}
}
internal suspend fun buildAppCallContext(call: ApplicationCall): CoroutineContext {
var context = coroutineContext
val callElement = ApplicationCallElement(call)
context = context.plus(callElement)
return context
}
然后我们可以一起使用它们,就像下面的这个测试用例一样,我们可以从嵌套的挂起函数中获取调用:
suspend fun getSomethingFromCall(): String {
val call = coroutineContext[ApplicationCallElement.Key]?.call ?: throw Exception("Element not set")
return call.parameters["key"] ?: throw Exception("Parameter not set")
}
fun Application.myApp() {
routing {
route("/foo") {
get {
withCall {
call.respondText(getSomethingFromCall())
}
}
}
}
}
class ApplicationCallTest {
@Test
fun `we can get the application call in a nested function`() {
withTestApplication({ myApp() }) {
with(handleRequest(HttpMethod.Get, "/foo?key=bar")) {
assertEquals(HttpStatusCode.OK, response.status())
assertEquals("bar", response.content)
}
}
}
}
Ktor有线程安全的方法可以静态访问当前的ApplicationCall吗?我正在尝试使以下简单示例起作用;
object Main {
fun start() {
val server = embeddedServer(Jetty, 8081) {
intercept(ApplicationCallPipeline.Call) {
// START: this will be more dynamic in the future, we don't want to pass ApplicationCall
Addon.processRequest()
// END: this will be more dynamic in the future, we don't want to pass ApplicationCall
call.respondText(output, ContentType.Text.Html, HttpStatusCode.OK)
return@intercept finish()
}
}
server.start(wait = true)
}
}
fun main(args: Array<String>) {
Main.start();
}
object Addon {
fun processRequest() {
val call = RequestUtils.getCurrentApplicationCall()
// processing of call.request.queryParameters
// ...
}
}
object RequestUtils {
fun getCurrentApplicationCall(): ApplicationCall {
// Here is where I am getting lost..
return null
}
}
我希望能够从 RequestUtils 静态获取当前上下文的 ApplicationCall,以便我可以在任何地方访问有关请求的信息。这当然需要扩展才能同时处理多个请求。
我用依赖注入和 ThreadLocal 做了一些实验,但没有成功。
好吧,应用程序调用被传递给协程,因此尝试获取它真的很危险"statically",因为所有请求都在并发上下文中处理。
Kotlin 官方文档谈到Thread-local in the context of coroutine executions。它使用 CoroutineContext 的概念来恢复 specific/custom 协程上下文中的线程局部值。
但是,如果您能够设计一个完全异步的 API,您将能够通过直接创建一个自定义的 CoroutineContext,嵌入请求调用来绕过线程本地。
编辑:我更新了我的示例代码以测试 2 种口味:
- 异步端点:完全基于协程上下文和挂起函数的解决方案
- blocking 端点:使用线程本地存储应用程序调用,如 kotlin doc. 中所述
import io.ktor.server.engine.embeddedServer
import io.ktor.server.jetty.Jetty
import io.ktor.application.*
import io.ktor.http.ContentType
import io.ktor.http.HttpStatusCode
import io.ktor.response.respondText
import io.ktor.routing.get
import io.ktor.routing.routing
import kotlinx.coroutines.asContextElement
import kotlinx.coroutines.launch
import kotlin.coroutines.AbstractCoroutineContextElement
import kotlin.coroutines.CoroutineContext
import kotlin.coroutines.coroutineContext
/**
* Thread local in which you'll inject application call.
*/
private val localCall : ThreadLocal<ApplicationCall> = ThreadLocal();
object Main {
fun start() {
val server = embeddedServer(Jetty, 8081) {
routing {
// Solution requiring full coroutine/ supendable execution.
get("/async") {
// Ktor will launch this block of code in a coroutine, so you can create a subroutine with
// an overloaded context providing needed information.
launch(coroutineContext + ApplicationCallContext(call)) {
PrintQuery.processAsync()
}
}
// Solution based on Thread-Local, not requiring suspending functions
get("/blocking") {
launch (coroutineContext + localCall.asContextElement(value = call)) {
PrintQuery.processBlocking()
}
}
}
intercept(ApplicationCallPipeline.ApplicationPhase.Call) {
call.respondText("Hé ho", ContentType.Text.Plain, HttpStatusCode.OK)
}
}
server.start(wait = true)
}
}
fun main() {
Main.start();
}
interface AsyncAddon {
/**
* Asynchronicity propagates in order to properly access coroutine execution information
*/
suspend fun processAsync();
}
interface BlockingAddon {
fun processBlocking();
}
object PrintQuery : AsyncAddon, BlockingAddon {
override suspend fun processAsync() = processRequest("async", fetchCurrentCallFromCoroutineContext())
override fun processBlocking() = processRequest("blocking", fetchCurrentCallFromThreadLocal())
private fun processRequest(prefix : String, call : ApplicationCall?) {
println("$prefix -> Query parameter: ${call?.parameters?.get("q") ?: "NONE"}")
}
}
/**
* Custom coroutine context allow to provide information about request execution.
*/
private class ApplicationCallContext(val call : ApplicationCall) : AbstractCoroutineContextElement(Key) {
companion object Key : CoroutineContext.Key<ApplicationCallContext>
}
/**
* This is your RequestUtils rewritten as a first-order function. It defines as asynchronous.
* If not, you won't be able to access coroutineContext.
*/
suspend fun fetchCurrentCallFromCoroutineContext(): ApplicationCall? {
// Here is where I am getting lost..
return coroutineContext.get(ApplicationCallContext.Key)?.call
}
fun fetchCurrentCallFromThreadLocal() : ApplicationCall? {
return localCall.get()
}
您可以在导航器中进行测试:
http://localhost:8081/blocking?q=test1
http://localhost:8081/blocking?q=test2
http://localhost:8081/async?q=test3
服务器日志输出:
blocking -> Query parameter: test1
blocking -> Query parameter: test2
async -> Query parameter: test3
您要为此使用的关键机制是 CoroutineContext
。这是您可以设置要在任何子协程或挂起函数调用中使用的键值对的地方。
我会试着举个例子。
首先,让我们定义一个 CoroutineContextElement
,它可以让我们将 ApplicationCall
添加到 CoroutineContext
。
class ApplicationCallElement(var call: ApplicationCall?) : AbstractCoroutineContextElement(ApplicationCallElement) {
companion object Key : CoroutineContext.Key<ApplicationCallElement>
}
现在我们可以定义一些助手,它们将在我们的其中一条路线上添加 ApplicationCall
。 (这可以作为某种监听管道的 Ktor 插件来完成,但我不想在这里增加太多噪音)。
suspend fun PipelineContext<Unit, ApplicationCall>.withCall(
bodyOfCall: suspend PipelineContext<Unit, ApplicationCall>.() -> Unit
) {
val pipeline = this
val appCallContext = buildAppCallContext(this.call)
withContext(appCallContext) {
pipeline.bodyOfCall()
}
}
internal suspend fun buildAppCallContext(call: ApplicationCall): CoroutineContext {
var context = coroutineContext
val callElement = ApplicationCallElement(call)
context = context.plus(callElement)
return context
}
然后我们可以一起使用它们,就像下面的这个测试用例一样,我们可以从嵌套的挂起函数中获取调用:
suspend fun getSomethingFromCall(): String {
val call = coroutineContext[ApplicationCallElement.Key]?.call ?: throw Exception("Element not set")
return call.parameters["key"] ?: throw Exception("Parameter not set")
}
fun Application.myApp() {
routing {
route("/foo") {
get {
withCall {
call.respondText(getSomethingFromCall())
}
}
}
}
}
class ApplicationCallTest {
@Test
fun `we can get the application call in a nested function`() {
withTestApplication({ myApp() }) {
with(handleRequest(HttpMethod.Get, "/foo?key=bar")) {
assertEquals(HttpStatusCode.OK, response.status())
assertEquals("bar", response.content)
}
}
}
}