Kotlin实现多线程请求对冲?

Kotlin to achieve multithread request hedging?

Spring 的 reactor 有一个有趣的功能:Hedging。这意味着产生许多请求并获得第一个 returned 结果,并自动清理其他上下文。 Josh Long recently has been actively promoting this feature. Googling Spring reactor hedging shows relative results. If anybody is curious , here 是示例代码。简而言之,Flux.first() 简化了所有底层的麻烦,这非常令人印象深刻。

我想知道如何使用 Kotlin 的协程和多线程(可能使用 FlowChannel )来实现这一点。我想到了一个简单的场景:一个服务接受 longUrl 并将 longUrl 生成给许多 URL 缩短服务(例如 IsGd、TinyUrl ...),并且 returns 第一个 returned URL ...(并终止/清理其他线程/协程资源)

有一个接口UrlShorter定义了这项工作:

interface UrlShorter {
  fun getShortUrl(longUrl: String): String?
}

并且有三种实现,一种用于 is.gd , another for tinyUrl,第三种是阻塞 10 秒和 return null 的 Dumb 实现:

class IsgdImpl : UrlShorter {
  override fun getShortUrl(longUrl: String): String? {
    logger.info("running : {}", Thread.currentThread().name)
    // isGd api url blocked by SO , it sucks . see the underlaying gist for full code
    val url = "https://is.gd/_create.php?format=simple&url=%s".format(URLEncoder.encode(longUrl, "UTF-8"))
    return Request.Get(url).execute().returnContent().asString().also {
      logger.info("returning {}", it)
    }
  }
}

class TinyImpl : UrlShorter {
  override fun getShortUrl(longUrl: String): String? {
    logger.info("running : {}", Thread.currentThread().name)
    val url = "http://tinyurl.com/_api-create.php?url=$longUrl" // sorry the URL is blocked by Whosebug , see the underlaying gist for full code
    return Request.Get(url).execute().returnContent().asString().also {
      logger.info("returning {}", it)
    }
  }
}

class DumbImpl : UrlShorter {
  override fun getShortUrl(longUrl: String): String? {
    logger.info("running : {}", Thread.currentThread().name)
    TimeUnit.SECONDS.sleep(10)
    return null
  }
}

还有一个 UrlShorterService 接受所有 UrlShorter 实现,并尝试生成协程并获得第一个结果。

这是我的想法:

@ExperimentalCoroutinesApi
@FlowPreview
class UrlShorterService(private val impls: List<UrlShorter>) {


  private val es: ExecutorService = Executors.newFixedThreadPool(impls.size)
  private val esDispatcher = es.asCoroutineDispatcher()

  suspend fun getShortUrl(longUrl: String): String {
    return method1(longUrl) // there are other methods , with different ways...
  }

  private inline fun <T, R : Any> Iterable<T>.firstNotNullResult(transform: (T) -> R?): R? {
    for (element in this) {
      val result = transform(element)
      if (result != null) return result
    }
    return null
  }

客户端也很简单:

@ExperimentalCoroutinesApi
@FlowPreview
class UrlShorterServiceTest {

  @Test
  fun testHedging() {
    val impls = listOf(DumbImpl(), IsgdImpl(), TinyImpl()) // Dumb first
    val service = UrlShorterService(impls)
    runBlocking {
      service.getShortUrl("https://www.google.com").also {
        logger.info("result = {}", it)
      }
    }
  }
}

注意我把 DumbImpl 放在第一位,因为我希望它可能首先生成并阻塞在它的线程中。其他两个实现都可以得到结果。

OK,问题来了,kotlin中如何实现对冲?我尝试以下方法:

  private suspend fun method1(longUrl: String): String {
    return impls.asSequence().asFlow().flatMapMerge(impls.size) { impl ->
      flow {
        impl.getShortUrl(longUrl)?.also {
          emit(it)
        }
      }.flowOn(esDispatcher)
    }.first()
      .also { esDispatcher.cancelChildren() } // doesn't impact the result
  }

我希望 method1 应该工作,但它总共执行了 10 秒:

00:56:09,253 INFO  TinyImpl - running : pool-1-thread-3
00:56:09,254 INFO  DumbImpl - running : pool-1-thread-1
00:56:09,253 INFO  IsgdImpl - running : pool-1-thread-2
00:56:11,150 INFO  TinyImpl - returning // tiny url blocked by SO , it sucks
00:56:13,604 INFO  IsgdImpl - returning // idGd url blocked by SO , it sucks
00:56:19,261 INFO  UrlShorterServiceTest$testHedging - result = // tiny url blocked by SO , it sucks

然后,我想到了其他方法 2、方法 3、方法 4、方法 5 ...,但都不行:

  /**
   * 00:54:29,035 INFO  IsgdImpl - running : pool-1-thread-3
   * 00:54:29,036 INFO  DumbImpl - running : pool-1-thread-2
   * 00:54:29,035 INFO  TinyImpl - running : pool-1-thread-1
   * 00:54:30,228 INFO  TinyImpl - returning // tiny url blocked by SO , it sucks
   * 00:54:30,797 INFO  IsgdImpl - returning // idGd url blocked by SO , it sucks
   * 00:54:39,046 INFO  UrlShorterServiceTest$testHedging - result = // idGd url blocked by SO , it sucks
   */
  private suspend fun method2(longUrl: String): String {
    return withContext(esDispatcher) {
      impls.map { impl ->
        async(esDispatcher) {
          impl.getShortUrl(longUrl)
        }
      }.firstNotNullResult { it.await() } ?: longUrl
    }
  }
  /**
   * 00:52:30,681 INFO  IsgdImpl - running : pool-1-thread-2
   * 00:52:30,682 INFO  DumbImpl - running : pool-1-thread-1
   * 00:52:30,681 INFO  TinyImpl - running : pool-1-thread-3
   * 00:52:31,838 INFO  TinyImpl - returning // tiny url blocked by SO , it sucks
   * 00:52:33,721 INFO  IsgdImpl - returning // idGd url blocked by SO , it sucks
   * 00:52:40,691 INFO  UrlShorterServiceTest$testHedging - result = // idGd url blocked by SO , it sucks
   */
  private suspend fun method3(longUrl: String): String {
    return coroutineScope {
      impls.map { impl ->
        async(esDispatcher) {
          impl.getShortUrl(longUrl)
        }
      }.firstNotNullResult { it.await() } ?: longUrl
    }
  }
  /**
   * 01:58:56,930 INFO  TinyImpl - running : pool-1-thread-1
   * 01:58:56,933 INFO  DumbImpl - running : pool-1-thread-2
   * 01:58:56,930 INFO  IsgdImpl - running : pool-1-thread-3
   * 01:58:58,411 INFO  TinyImpl - returning // tiny url blocked by SO , it sucks
   * 01:58:59,026 INFO  IsgdImpl - returning // idGd url blocked by SO , it sucks
   * 01:59:06,942 INFO  UrlShorterServiceTest$testHedging - result =  // idGd url blocked by SO , it sucks
   */
  private suspend fun method4(longUrl: String): String {
    return withContext(esDispatcher) {
      impls.map { impl ->
        async {
          impl.getShortUrl(longUrl)
        }
      }.firstNotNullResult { it.await() } ?: longUrl
    }
  }

我不熟悉Channel,抱歉例外↓

  /**
   * 01:29:44,460 INFO  UrlShorterService$method5 - channel closed
   * 01:29:44,461 INFO  DumbImpl - running : pool-1-thread-2
   * 01:29:44,460 INFO  IsgdImpl - running : pool-1-thread-3
   * 01:29:44,466 INFO  TinyImpl - running : pool-1-thread-1
   * 01:29:45,765 INFO  TinyImpl - returning // tiny url blocked by SO , it sucks
   * 01:29:46,339 INFO  IsgdImpl - returning // idGd url blocked by SO , it sucks
   *
   * kotlinx.coroutines.channels.ClosedSendChannelException: Channel was closed
   *
   */
  private suspend fun method5(longUrl: String): String {
    val channel = Channel<String>()

    withContext(esDispatcher) {
      impls.forEach { impl ->
        launch {
          impl.getShortUrl(longUrl)?.also {
            channel.send(it)
          }
        }
      }
      channel.close()
      logger.info("channel closed")
    }

    return channel.consumeAsFlow().first()
  }

好的,我不知道是否还有其他方法...但以上所有方法均无效...所有块至少 10 秒(被 DumbImpl 阻塞)。

完整的源代码可以在 github gist 上找到。

kotlin如何实现对冲?通过 DeferredFlowChannel 或任何其他更好的想法?谢谢。

提交问题后发现tinyurl,isGdurl都被SO屏蔽了。真的很烂!

这基本上就是 select APi 的设计目的:

coroutineScope {
    select {
        impls.forEach { impl ->
            async {
               impl.getShortUrl(longUrl)
            }.onAwait { it }
        }
    }
    coroutineContext[Job].cancelChildren() // Cancel any requests that are still going.
}

请注意,这不会处理服务实现抛出的异常,如果您想实际处理这些异常,您需要使用带有自定义异常处理程序的 supervisorScope 和过滤 select 循环.

如果您要并行执行的实际工作包括网络提取,您应该选择一个异步网络库,这样您就可以正确地使用非阻塞协程。例如,从版本 11 开始,JDK 提供了一个异步 HTTP 客户端,您可以按如下方式使用它:

val httpClient: HttpClient = HttpClient.newHttpClient()

suspend fun httpGet(url: String): String = httpClient
        .sendAsync(
                HttpRequest.newBuilder().uri(URI.create(url)).build(),
                BodyHandlers.ofString())
        .await()
        .body()

这是一个函数,它在给定上述可暂停实现的情况下完成请求对冲:

class UrlShortenerService(
        private val impls: List<UrlShortener>
) {
    suspend fun getShortUrl(longUrl: String): String? = impls
            .asFlow()
            .flatMapMerge(impls.size) { impl ->
                flow<String?> {
                    try {
                        impl.getShortUrl(longUrl)?.also { emit(it) }
                    }
                    catch (e: Exception) { 
                        // maybe log it, but don't let it propagate
                    }
                }
            }
            .onCompletion { emit(null) }
            .first()
}

请注意没有任何自定义调度程序,您不需要它们来进行可暂停的工作。任何调度程序都可以,所有工作都可以在一个线程中 运行。

当您的所有 URL 缩短器都失败时,onCompletion 部分开始起作用。在那种情况下,flatMapMerge 阶段不会发出任何东西,如果没有额外的 null 注入到流程中,first() 就会死锁。

为了测试它,我使用了以下代码:

class Shortener(
        private val delay: Long
) : UrlShortener {
    override suspend fun getShortUrl(longUrl: String): String? {
        delay(delay * 1000)
        println("Shortener $delay completing")
        if (delay == 1L) {
            throw Exception("failed service")
        }
        if (delay == 2L) {
            return null
        }
        return "shortened after $delay seconds"
    }
}

suspend fun main() {
    val shorteners = listOf(
            Shortener(4),
            Shortener(3),
            Shortener(2),
            Shortener(1)
    )
    measureTimeMillis {
        UrlShortenerService(shorteners).getShortUrl("bla").also {
            println(it)
        }
    }.also {
        println("Took $it ms")
    }
}

这练习了各种失败案例,例如返回 null 或因异常而失败。对于此代码,我得到以下输出:

Shortener 1 completing
Shortener 2 completing
Shortener 3 completing
shortened after 3 seconds
Took 3080 ms

我们可以看到缩短器 1 和 2 已完成但失败,缩短器 3 返回了有效响应,缩短器 4 在完成之前被取消。我认为这符合要求。


如果您不能摆脱阻塞请求,您的实现将不得不启动 num_impls * num_concurrent_requests 个线程,这不是很好。但是,如果这是您所能拥有的最好的,这里有一个实现可以避免阻塞请求,但可以暂停和取消地等待它们。它会向工作线程发送中断信号 运行 处理请求,但如果您的图书馆的 IO 代码是不可中断的,这些线程将挂起等待它们的请求完成或超时。

val es = Executors.newCachedThreadPool()

interface UrlShortener {
    fun getShortUrl(longUrl: String): String? // not suspendable!
}

class UrlShortenerService(
        private val impls: List<UrlShortener>
) {
    suspend fun getShortUrl(longUrl: String): String {
        val chan = Channel<String?>()
        val futures = impls.map { impl -> es.submit {
            try {
                impl.getShortUrl(longUrl)
            } catch (e: Exception) {
                null
            }.also { runBlocking { chan.send(it) } }
        } }
        try {
            (1..impls.size).forEach { _ ->
                chan.receive()?.also { return it }
            }
            throw Exception("All services failed")
        } finally {
            chan.close()
            futures.forEach { it.cancel(true) }
        }
    }
}