Kotlin实现多线程请求对冲?
Kotlin to achieve multithread request hedging?
Spring 的 reactor 有一个有趣的功能:Hedging。这意味着产生许多请求并获得第一个 returned 结果,并自动清理其他上下文。 Josh Long recently has been actively promoting this feature. Googling Spring reactor hedging shows relative results. If anybody is curious , here 是示例代码。简而言之,Flux.first()
简化了所有底层的麻烦,这非常令人印象深刻。
我想知道如何使用 Kotlin 的协程和多线程(可能使用 Flow
或 Channel
)来实现这一点。我想到了一个简单的场景:一个服务接受 longUrl 并将 longUrl 生成给许多 URL 缩短服务(例如 IsGd、TinyUrl ...),并且 returns 第一个 returned URL ...(并终止/清理其他线程/协程资源)
有一个接口UrlShorter
定义了这项工作:
interface UrlShorter {
fun getShortUrl(longUrl: String): String?
}
并且有三种实现,一种用于 is.gd , another for tinyUrl,第三种是阻塞 10 秒和 return null 的 Dumb 实现:
class IsgdImpl : UrlShorter {
override fun getShortUrl(longUrl: String): String? {
logger.info("running : {}", Thread.currentThread().name)
// isGd api url blocked by SO , it sucks . see the underlaying gist for full code
val url = "https://is.gd/_create.php?format=simple&url=%s".format(URLEncoder.encode(longUrl, "UTF-8"))
return Request.Get(url).execute().returnContent().asString().also {
logger.info("returning {}", it)
}
}
}
class TinyImpl : UrlShorter {
override fun getShortUrl(longUrl: String): String? {
logger.info("running : {}", Thread.currentThread().name)
val url = "http://tinyurl.com/_api-create.php?url=$longUrl" // sorry the URL is blocked by Whosebug , see the underlaying gist for full code
return Request.Get(url).execute().returnContent().asString().also {
logger.info("returning {}", it)
}
}
}
class DumbImpl : UrlShorter {
override fun getShortUrl(longUrl: String): String? {
logger.info("running : {}", Thread.currentThread().name)
TimeUnit.SECONDS.sleep(10)
return null
}
}
还有一个 UrlShorterService
接受所有 UrlShorter
实现,并尝试生成协程并获得第一个结果。
这是我的想法:
@ExperimentalCoroutinesApi
@FlowPreview
class UrlShorterService(private val impls: List<UrlShorter>) {
private val es: ExecutorService = Executors.newFixedThreadPool(impls.size)
private val esDispatcher = es.asCoroutineDispatcher()
suspend fun getShortUrl(longUrl: String): String {
return method1(longUrl) // there are other methods , with different ways...
}
private inline fun <T, R : Any> Iterable<T>.firstNotNullResult(transform: (T) -> R?): R? {
for (element in this) {
val result = transform(element)
if (result != null) return result
}
return null
}
客户端也很简单:
@ExperimentalCoroutinesApi
@FlowPreview
class UrlShorterServiceTest {
@Test
fun testHedging() {
val impls = listOf(DumbImpl(), IsgdImpl(), TinyImpl()) // Dumb first
val service = UrlShorterService(impls)
runBlocking {
service.getShortUrl("https://www.google.com").also {
logger.info("result = {}", it)
}
}
}
}
注意我把 DumbImpl
放在第一位,因为我希望它可能首先生成并阻塞在它的线程中。其他两个实现都可以得到结果。
OK,问题来了,kotlin中如何实现对冲?我尝试以下方法:
private suspend fun method1(longUrl: String): String {
return impls.asSequence().asFlow().flatMapMerge(impls.size) { impl ->
flow {
impl.getShortUrl(longUrl)?.also {
emit(it)
}
}.flowOn(esDispatcher)
}.first()
.also { esDispatcher.cancelChildren() } // doesn't impact the result
}
我希望 method1
应该工作,但它总共执行了 10 秒:
00:56:09,253 INFO TinyImpl - running : pool-1-thread-3
00:56:09,254 INFO DumbImpl - running : pool-1-thread-1
00:56:09,253 INFO IsgdImpl - running : pool-1-thread-2
00:56:11,150 INFO TinyImpl - returning // tiny url blocked by SO , it sucks
00:56:13,604 INFO IsgdImpl - returning // idGd url blocked by SO , it sucks
00:56:19,261 INFO UrlShorterServiceTest$testHedging - result = // tiny url blocked by SO , it sucks
然后,我想到了其他方法 2、方法 3、方法 4、方法 5 ...,但都不行:
/**
* 00:54:29,035 INFO IsgdImpl - running : pool-1-thread-3
* 00:54:29,036 INFO DumbImpl - running : pool-1-thread-2
* 00:54:29,035 INFO TinyImpl - running : pool-1-thread-1
* 00:54:30,228 INFO TinyImpl - returning // tiny url blocked by SO , it sucks
* 00:54:30,797 INFO IsgdImpl - returning // idGd url blocked by SO , it sucks
* 00:54:39,046 INFO UrlShorterServiceTest$testHedging - result = // idGd url blocked by SO , it sucks
*/
private suspend fun method2(longUrl: String): String {
return withContext(esDispatcher) {
impls.map { impl ->
async(esDispatcher) {
impl.getShortUrl(longUrl)
}
}.firstNotNullResult { it.await() } ?: longUrl
}
}
/**
* 00:52:30,681 INFO IsgdImpl - running : pool-1-thread-2
* 00:52:30,682 INFO DumbImpl - running : pool-1-thread-1
* 00:52:30,681 INFO TinyImpl - running : pool-1-thread-3
* 00:52:31,838 INFO TinyImpl - returning // tiny url blocked by SO , it sucks
* 00:52:33,721 INFO IsgdImpl - returning // idGd url blocked by SO , it sucks
* 00:52:40,691 INFO UrlShorterServiceTest$testHedging - result = // idGd url blocked by SO , it sucks
*/
private suspend fun method3(longUrl: String): String {
return coroutineScope {
impls.map { impl ->
async(esDispatcher) {
impl.getShortUrl(longUrl)
}
}.firstNotNullResult { it.await() } ?: longUrl
}
}
/**
* 01:58:56,930 INFO TinyImpl - running : pool-1-thread-1
* 01:58:56,933 INFO DumbImpl - running : pool-1-thread-2
* 01:58:56,930 INFO IsgdImpl - running : pool-1-thread-3
* 01:58:58,411 INFO TinyImpl - returning // tiny url blocked by SO , it sucks
* 01:58:59,026 INFO IsgdImpl - returning // idGd url blocked by SO , it sucks
* 01:59:06,942 INFO UrlShorterServiceTest$testHedging - result = // idGd url blocked by SO , it sucks
*/
private suspend fun method4(longUrl: String): String {
return withContext(esDispatcher) {
impls.map { impl ->
async {
impl.getShortUrl(longUrl)
}
}.firstNotNullResult { it.await() } ?: longUrl
}
}
我不熟悉Channel
,抱歉例外↓
/**
* 01:29:44,460 INFO UrlShorterService$method5 - channel closed
* 01:29:44,461 INFO DumbImpl - running : pool-1-thread-2
* 01:29:44,460 INFO IsgdImpl - running : pool-1-thread-3
* 01:29:44,466 INFO TinyImpl - running : pool-1-thread-1
* 01:29:45,765 INFO TinyImpl - returning // tiny url blocked by SO , it sucks
* 01:29:46,339 INFO IsgdImpl - returning // idGd url blocked by SO , it sucks
*
* kotlinx.coroutines.channels.ClosedSendChannelException: Channel was closed
*
*/
private suspend fun method5(longUrl: String): String {
val channel = Channel<String>()
withContext(esDispatcher) {
impls.forEach { impl ->
launch {
impl.getShortUrl(longUrl)?.also {
channel.send(it)
}
}
}
channel.close()
logger.info("channel closed")
}
return channel.consumeAsFlow().first()
}
好的,我不知道是否还有其他方法...但以上所有方法均无效...所有块至少 10 秒(被 DumbImpl
阻塞)。
完整的源代码可以在 github gist 上找到。
kotlin如何实现对冲?通过 Deferred
或 Flow
或 Channel
或任何其他更好的想法?谢谢。
提交问题后发现tinyurl,isGdurl都被SO屏蔽了。真的很烂!
这基本上就是 select
APi 的设计目的:
coroutineScope {
select {
impls.forEach { impl ->
async {
impl.getShortUrl(longUrl)
}.onAwait { it }
}
}
coroutineContext[Job].cancelChildren() // Cancel any requests that are still going.
}
请注意,这不会处理服务实现抛出的异常,如果您想实际处理这些异常,您需要使用带有自定义异常处理程序的 supervisorScope
和过滤 select 循环.
如果您要并行执行的实际工作包括网络提取,您应该选择一个异步网络库,这样您就可以正确地使用非阻塞协程。例如,从版本 11 开始,JDK 提供了一个异步 HTTP 客户端,您可以按如下方式使用它:
val httpClient: HttpClient = HttpClient.newHttpClient()
suspend fun httpGet(url: String): String = httpClient
.sendAsync(
HttpRequest.newBuilder().uri(URI.create(url)).build(),
BodyHandlers.ofString())
.await()
.body()
这是一个函数,它在给定上述可暂停实现的情况下完成请求对冲:
class UrlShortenerService(
private val impls: List<UrlShortener>
) {
suspend fun getShortUrl(longUrl: String): String? = impls
.asFlow()
.flatMapMerge(impls.size) { impl ->
flow<String?> {
try {
impl.getShortUrl(longUrl)?.also { emit(it) }
}
catch (e: Exception) {
// maybe log it, but don't let it propagate
}
}
}
.onCompletion { emit(null) }
.first()
}
请注意没有任何自定义调度程序,您不需要它们来进行可暂停的工作。任何调度程序都可以,所有工作都可以在一个线程中 运行。
当您的所有 URL 缩短器都失败时,onCompletion
部分开始起作用。在那种情况下,flatMapMerge
阶段不会发出任何东西,如果没有额外的 null
注入到流程中,first()
就会死锁。
为了测试它,我使用了以下代码:
class Shortener(
private val delay: Long
) : UrlShortener {
override suspend fun getShortUrl(longUrl: String): String? {
delay(delay * 1000)
println("Shortener $delay completing")
if (delay == 1L) {
throw Exception("failed service")
}
if (delay == 2L) {
return null
}
return "shortened after $delay seconds"
}
}
suspend fun main() {
val shorteners = listOf(
Shortener(4),
Shortener(3),
Shortener(2),
Shortener(1)
)
measureTimeMillis {
UrlShortenerService(shorteners).getShortUrl("bla").also {
println(it)
}
}.also {
println("Took $it ms")
}
}
这练习了各种失败案例,例如返回 null 或因异常而失败。对于此代码,我得到以下输出:
Shortener 1 completing
Shortener 2 completing
Shortener 3 completing
shortened after 3 seconds
Took 3080 ms
我们可以看到缩短器 1 和 2 已完成但失败,缩短器 3 返回了有效响应,缩短器 4 在完成之前被取消。我认为这符合要求。
如果您不能摆脱阻塞请求,您的实现将不得不启动 num_impls * num_concurrent_requests
个线程,这不是很好。但是,如果这是您所能拥有的最好的,这里有一个实现可以避免阻塞请求,但可以暂停和取消地等待它们。它会向工作线程发送中断信号 运行 处理请求,但如果您的图书馆的 IO 代码是不可中断的,这些线程将挂起等待它们的请求完成或超时。
val es = Executors.newCachedThreadPool()
interface UrlShortener {
fun getShortUrl(longUrl: String): String? // not suspendable!
}
class UrlShortenerService(
private val impls: List<UrlShortener>
) {
suspend fun getShortUrl(longUrl: String): String {
val chan = Channel<String?>()
val futures = impls.map { impl -> es.submit {
try {
impl.getShortUrl(longUrl)
} catch (e: Exception) {
null
}.also { runBlocking { chan.send(it) } }
} }
try {
(1..impls.size).forEach { _ ->
chan.receive()?.also { return it }
}
throw Exception("All services failed")
} finally {
chan.close()
futures.forEach { it.cancel(true) }
}
}
}
Spring 的 reactor 有一个有趣的功能:Hedging。这意味着产生许多请求并获得第一个 returned 结果,并自动清理其他上下文。 Josh Long recently has been actively promoting this feature. Googling Spring reactor hedging shows relative results. If anybody is curious , here 是示例代码。简而言之,Flux.first()
简化了所有底层的麻烦,这非常令人印象深刻。
我想知道如何使用 Kotlin 的协程和多线程(可能使用 Flow
或 Channel
)来实现这一点。我想到了一个简单的场景:一个服务接受 longUrl 并将 longUrl 生成给许多 URL 缩短服务(例如 IsGd、TinyUrl ...),并且 returns 第一个 returned URL ...(并终止/清理其他线程/协程资源)
有一个接口UrlShorter
定义了这项工作:
interface UrlShorter {
fun getShortUrl(longUrl: String): String?
}
并且有三种实现,一种用于 is.gd , another for tinyUrl,第三种是阻塞 10 秒和 return null 的 Dumb 实现:
class IsgdImpl : UrlShorter {
override fun getShortUrl(longUrl: String): String? {
logger.info("running : {}", Thread.currentThread().name)
// isGd api url blocked by SO , it sucks . see the underlaying gist for full code
val url = "https://is.gd/_create.php?format=simple&url=%s".format(URLEncoder.encode(longUrl, "UTF-8"))
return Request.Get(url).execute().returnContent().asString().also {
logger.info("returning {}", it)
}
}
}
class TinyImpl : UrlShorter {
override fun getShortUrl(longUrl: String): String? {
logger.info("running : {}", Thread.currentThread().name)
val url = "http://tinyurl.com/_api-create.php?url=$longUrl" // sorry the URL is blocked by Whosebug , see the underlaying gist for full code
return Request.Get(url).execute().returnContent().asString().also {
logger.info("returning {}", it)
}
}
}
class DumbImpl : UrlShorter {
override fun getShortUrl(longUrl: String): String? {
logger.info("running : {}", Thread.currentThread().name)
TimeUnit.SECONDS.sleep(10)
return null
}
}
还有一个 UrlShorterService
接受所有 UrlShorter
实现,并尝试生成协程并获得第一个结果。
这是我的想法:
@ExperimentalCoroutinesApi
@FlowPreview
class UrlShorterService(private val impls: List<UrlShorter>) {
private val es: ExecutorService = Executors.newFixedThreadPool(impls.size)
private val esDispatcher = es.asCoroutineDispatcher()
suspend fun getShortUrl(longUrl: String): String {
return method1(longUrl) // there are other methods , with different ways...
}
private inline fun <T, R : Any> Iterable<T>.firstNotNullResult(transform: (T) -> R?): R? {
for (element in this) {
val result = transform(element)
if (result != null) return result
}
return null
}
客户端也很简单:
@ExperimentalCoroutinesApi
@FlowPreview
class UrlShorterServiceTest {
@Test
fun testHedging() {
val impls = listOf(DumbImpl(), IsgdImpl(), TinyImpl()) // Dumb first
val service = UrlShorterService(impls)
runBlocking {
service.getShortUrl("https://www.google.com").also {
logger.info("result = {}", it)
}
}
}
}
注意我把 DumbImpl
放在第一位,因为我希望它可能首先生成并阻塞在它的线程中。其他两个实现都可以得到结果。
OK,问题来了,kotlin中如何实现对冲?我尝试以下方法:
private suspend fun method1(longUrl: String): String {
return impls.asSequence().asFlow().flatMapMerge(impls.size) { impl ->
flow {
impl.getShortUrl(longUrl)?.also {
emit(it)
}
}.flowOn(esDispatcher)
}.first()
.also { esDispatcher.cancelChildren() } // doesn't impact the result
}
我希望 method1
应该工作,但它总共执行了 10 秒:
00:56:09,253 INFO TinyImpl - running : pool-1-thread-3
00:56:09,254 INFO DumbImpl - running : pool-1-thread-1
00:56:09,253 INFO IsgdImpl - running : pool-1-thread-2
00:56:11,150 INFO TinyImpl - returning // tiny url blocked by SO , it sucks
00:56:13,604 INFO IsgdImpl - returning // idGd url blocked by SO , it sucks
00:56:19,261 INFO UrlShorterServiceTest$testHedging - result = // tiny url blocked by SO , it sucks
然后,我想到了其他方法 2、方法 3、方法 4、方法 5 ...,但都不行:
/**
* 00:54:29,035 INFO IsgdImpl - running : pool-1-thread-3
* 00:54:29,036 INFO DumbImpl - running : pool-1-thread-2
* 00:54:29,035 INFO TinyImpl - running : pool-1-thread-1
* 00:54:30,228 INFO TinyImpl - returning // tiny url blocked by SO , it sucks
* 00:54:30,797 INFO IsgdImpl - returning // idGd url blocked by SO , it sucks
* 00:54:39,046 INFO UrlShorterServiceTest$testHedging - result = // idGd url blocked by SO , it sucks
*/
private suspend fun method2(longUrl: String): String {
return withContext(esDispatcher) {
impls.map { impl ->
async(esDispatcher) {
impl.getShortUrl(longUrl)
}
}.firstNotNullResult { it.await() } ?: longUrl
}
}
/**
* 00:52:30,681 INFO IsgdImpl - running : pool-1-thread-2
* 00:52:30,682 INFO DumbImpl - running : pool-1-thread-1
* 00:52:30,681 INFO TinyImpl - running : pool-1-thread-3
* 00:52:31,838 INFO TinyImpl - returning // tiny url blocked by SO , it sucks
* 00:52:33,721 INFO IsgdImpl - returning // idGd url blocked by SO , it sucks
* 00:52:40,691 INFO UrlShorterServiceTest$testHedging - result = // idGd url blocked by SO , it sucks
*/
private suspend fun method3(longUrl: String): String {
return coroutineScope {
impls.map { impl ->
async(esDispatcher) {
impl.getShortUrl(longUrl)
}
}.firstNotNullResult { it.await() } ?: longUrl
}
}
/**
* 01:58:56,930 INFO TinyImpl - running : pool-1-thread-1
* 01:58:56,933 INFO DumbImpl - running : pool-1-thread-2
* 01:58:56,930 INFO IsgdImpl - running : pool-1-thread-3
* 01:58:58,411 INFO TinyImpl - returning // tiny url blocked by SO , it sucks
* 01:58:59,026 INFO IsgdImpl - returning // idGd url blocked by SO , it sucks
* 01:59:06,942 INFO UrlShorterServiceTest$testHedging - result = // idGd url blocked by SO , it sucks
*/
private suspend fun method4(longUrl: String): String {
return withContext(esDispatcher) {
impls.map { impl ->
async {
impl.getShortUrl(longUrl)
}
}.firstNotNullResult { it.await() } ?: longUrl
}
}
我不熟悉Channel
,抱歉例外↓
/**
* 01:29:44,460 INFO UrlShorterService$method5 - channel closed
* 01:29:44,461 INFO DumbImpl - running : pool-1-thread-2
* 01:29:44,460 INFO IsgdImpl - running : pool-1-thread-3
* 01:29:44,466 INFO TinyImpl - running : pool-1-thread-1
* 01:29:45,765 INFO TinyImpl - returning // tiny url blocked by SO , it sucks
* 01:29:46,339 INFO IsgdImpl - returning // idGd url blocked by SO , it sucks
*
* kotlinx.coroutines.channels.ClosedSendChannelException: Channel was closed
*
*/
private suspend fun method5(longUrl: String): String {
val channel = Channel<String>()
withContext(esDispatcher) {
impls.forEach { impl ->
launch {
impl.getShortUrl(longUrl)?.also {
channel.send(it)
}
}
}
channel.close()
logger.info("channel closed")
}
return channel.consumeAsFlow().first()
}
好的,我不知道是否还有其他方法...但以上所有方法均无效...所有块至少 10 秒(被 DumbImpl
阻塞)。
完整的源代码可以在 github gist 上找到。
kotlin如何实现对冲?通过 Deferred
或 Flow
或 Channel
或任何其他更好的想法?谢谢。
提交问题后发现tinyurl,isGdurl都被SO屏蔽了。真的很烂!
这基本上就是 select
APi 的设计目的:
coroutineScope {
select {
impls.forEach { impl ->
async {
impl.getShortUrl(longUrl)
}.onAwait { it }
}
}
coroutineContext[Job].cancelChildren() // Cancel any requests that are still going.
}
请注意,这不会处理服务实现抛出的异常,如果您想实际处理这些异常,您需要使用带有自定义异常处理程序的 supervisorScope
和过滤 select 循环.
如果您要并行执行的实际工作包括网络提取,您应该选择一个异步网络库,这样您就可以正确地使用非阻塞协程。例如,从版本 11 开始,JDK 提供了一个异步 HTTP 客户端,您可以按如下方式使用它:
val httpClient: HttpClient = HttpClient.newHttpClient()
suspend fun httpGet(url: String): String = httpClient
.sendAsync(
HttpRequest.newBuilder().uri(URI.create(url)).build(),
BodyHandlers.ofString())
.await()
.body()
这是一个函数,它在给定上述可暂停实现的情况下完成请求对冲:
class UrlShortenerService(
private val impls: List<UrlShortener>
) {
suspend fun getShortUrl(longUrl: String): String? = impls
.asFlow()
.flatMapMerge(impls.size) { impl ->
flow<String?> {
try {
impl.getShortUrl(longUrl)?.also { emit(it) }
}
catch (e: Exception) {
// maybe log it, but don't let it propagate
}
}
}
.onCompletion { emit(null) }
.first()
}
请注意没有任何自定义调度程序,您不需要它们来进行可暂停的工作。任何调度程序都可以,所有工作都可以在一个线程中 运行。
当您的所有 URL 缩短器都失败时,onCompletion
部分开始起作用。在那种情况下,flatMapMerge
阶段不会发出任何东西,如果没有额外的 null
注入到流程中,first()
就会死锁。
为了测试它,我使用了以下代码:
class Shortener(
private val delay: Long
) : UrlShortener {
override suspend fun getShortUrl(longUrl: String): String? {
delay(delay * 1000)
println("Shortener $delay completing")
if (delay == 1L) {
throw Exception("failed service")
}
if (delay == 2L) {
return null
}
return "shortened after $delay seconds"
}
}
suspend fun main() {
val shorteners = listOf(
Shortener(4),
Shortener(3),
Shortener(2),
Shortener(1)
)
measureTimeMillis {
UrlShortenerService(shorteners).getShortUrl("bla").also {
println(it)
}
}.also {
println("Took $it ms")
}
}
这练习了各种失败案例,例如返回 null 或因异常而失败。对于此代码,我得到以下输出:
Shortener 1 completing
Shortener 2 completing
Shortener 3 completing
shortened after 3 seconds
Took 3080 ms
我们可以看到缩短器 1 和 2 已完成但失败,缩短器 3 返回了有效响应,缩短器 4 在完成之前被取消。我认为这符合要求。
如果您不能摆脱阻塞请求,您的实现将不得不启动 num_impls * num_concurrent_requests
个线程,这不是很好。但是,如果这是您所能拥有的最好的,这里有一个实现可以避免阻塞请求,但可以暂停和取消地等待它们。它会向工作线程发送中断信号 运行 处理请求,但如果您的图书馆的 IO 代码是不可中断的,这些线程将挂起等待它们的请求完成或超时。
val es = Executors.newCachedThreadPool()
interface UrlShortener {
fun getShortUrl(longUrl: String): String? // not suspendable!
}
class UrlShortenerService(
private val impls: List<UrlShortener>
) {
suspend fun getShortUrl(longUrl: String): String {
val chan = Channel<String?>()
val futures = impls.map { impl -> es.submit {
try {
impl.getShortUrl(longUrl)
} catch (e: Exception) {
null
}.also { runBlocking { chan.send(it) } }
} }
try {
(1..impls.size).forEach { _ ->
chan.receive()?.also { return it }
}
throw Exception("All services failed")
} finally {
chan.close()
futures.forEach { it.cancel(true) }
}
}
}