Kotlin 协程 vs CompletableFuture
Kotlin Coroutines vs CompletableFuture
谁能解释一下为什么人们应该使用协程?是否有一些协程代码示例显示比常规 java 并发代码 更好的完成时间(没有神奇的 delay() 函数,没有人在生产中使用 delay()
)?
在我个人的示例中,协程(第 1 行)与 java 代码(第 2 行)相比很糟糕。也许我做错了什么?
示例:
import kotlinx.coroutines.*
import java.time.Instant
import java.util.concurrent.CompletableFuture
import java.util.concurrent.Future
@ExperimentalCoroutinesApi
fun main() = runBlocking {
val begin = Instant.now().toEpochMilli()
val jobs = List(150_000) {
GlobalScope.launch { print(getText().await()) } // :1
// CompletableFuture.supplyAsync { "." }.thenAccept { print(it) } // :2
}
jobs.forEach { it.join() }
println(Instant.now().toEpochMilli() - begin)
}
fun getText(): Future<String> {
return CompletableFuture.supplyAsync {
"."
}
}
@ExperimentalCoroutinesApi
suspend fun <T> Future<T>.await(): T = suspendCancellableCoroutine { cont ->
cont.resume(this.get()) {
this.cancel(true)
}
}
附加问题:
为什么我应该创建这个协程包装器 await()
?似乎没有改进代码的协程版本,否则 get()
方法在 inappropriate blocking method call
?
上抱怨
协程的目标不是 "better completion time." 目标 - 老实说,它取得了相当大的成功 - 是协程 更易于使用 。
也就是说,您在代码中所做的根本不是比较两种替代方法速度的好方法。比较 Java 中事物的速度并获得真实的结果 非常困难 ,在尝试之前您应该至少阅读 How do I write a correct micro-benchmark in Java?。您当前尝试比较两段 Java 代码的方式将 欺骗您 关于代码的实际性能行为。
要回答您的附加问题,答案是您不应该 创建那个 await
方法。您不应该将 get()
— 或 java.util.concurrent.Future
— 与协程代码一起使用,无论它是在 suspendCancellableCoroutine
中还是其他地方。如果您想使用 CompletableFuture
,请使用 the provided library 从协程代码中与其交互。
切换到这个 kotlinx-coroutines-jdk8 库并将 sleep(1) 添加到我的 getText()
函数后
import kotlinx.coroutines.GlobalScope
import kotlinx.coroutines.future.await
import kotlinx.coroutines.launch
import kotlinx.coroutines.runBlocking
import java.lang.Thread.sleep
import java.time.Instant
import java.util.concurrent.CompletableFuture
fun main() = runBlocking {
val begin = Instant.now().toEpochMilli()
val jobs = List(150_000) {
GlobalScope.launch { print(getText().await()) } // :1
// getText().thenAccept { print(it) } // :2
}
jobs.forEach { it.join() }
println(Instant.now().toEpochMilli() - begin)
}
fun getText(): CompletableFuture<String> {
return CompletableFuture.supplyAsync {
sleep(1)
"."
}
}
我使协程版本比 java 版本更快!!!显然,当有一些延迟时,这个额外的协程层变得合理。
这是我用于基准测试的代码的清理版本。请注意,我从测量代码中删除了 print
,因为打印本身是一个重量级操作,涉及互斥体、JNI、阻塞输出流等。相反,我更新了一个 volatile 变量。
import kotlinx.coroutines.ExperimentalCoroutinesApi
import kotlinx.coroutines.GlobalScope
import kotlinx.coroutines.future.await
import kotlinx.coroutines.launch
import kotlinx.coroutines.runBlocking
import java.lang.Thread.sleep
import java.util.concurrent.CompletableFuture
import java.util.concurrent.CompletionStage
import java.util.concurrent.TimeUnit.NANOSECONDS
@Volatile
var total = 0
@ExperimentalCoroutinesApi
fun main() = runBlocking {
println("Warmup")
measure(20_000)
println("Measure")
val begin = System.nanoTime()
measure(40_000)
println("Completed in ${NANOSECONDS.toMillis(System.nanoTime() - begin)} ms")
}
fun getText(): CompletableFuture<Int> {
return CompletableFuture.supplyAsync {
sleep(1)
1
}
}
suspend fun measure(count: Int) {
val jobs = List(count) {
GlobalScope.launch { total += getText().await() } // :1
// getText().thenAccept { total += it } // :2
}
jobs.forEach { it.join() }
}
我的结果是第一个案例为 6.5 秒,而第二个案例为 7 秒。这是一个 7% 的差异,它可能非常特定于这个确切的场景,而不是您通常认为两种方法之间的差异。
选择协程而不是基于 CompletionStage
的编程的原因绝对不是那 7%,而是关于便利性的巨大差异。为了明白我的意思,我邀请您重写 main
函数,只调用 computeAsync
,而不使用 future.await()
:
suspend fun main() {
try {
if (compute(1) == 2) {
println(compute(4))
} else {
println(compute(7))
}
} catch (e: RuntimeException) {
println("Got an error")
println(compute(8))
}
}
fun main_no_coroutines() {
// Let's see how it might look!
}
fun computeAsync(input: Int): CompletableFuture<Int> {
return CompletableFuture.supplyAsync {
sleep(1)
if (input == 7) {
throw RuntimeException("Input was 7")
}
input % 3
}
}
suspend fun compute(input: Int) = computeAsync(input).await()
我的 compute
方法的 2 个版本没有重写方法签名。我想我明白你的意思了。使用协程,我们可以以熟悉的顺序风格编写复杂的并行代码。但是协程 await
包装器由于暂停技术而无法使这项工作,它只是实现了与我所做的相同的逻辑。
import java.lang.Thread.sleep
import java.util.concurrent.CompletableFuture
fun main() {
try {
if (compute(1) == 2) {
println(compute(4))
} else {
println(compute(7))
}
} catch (e: RuntimeException) {
println("Got an error")
println(compute(8))
}
}
fun compute(input: Int): Int {
var exception: Throwable? = null
val supplyAsync = CompletableFuture.supplyAsync {
sleep(1)
if (input == 7) {
throw RuntimeException("Input was 7")
}
input % 3
}.exceptionally {
exception = it
throw it
}
while (supplyAsync.isDone.not()) {}
return if (supplyAsync.isCompletedExceptionally) {
throw exception!!
} else supplyAsync.get()
}
fun compute2(input: Int): Int {
try {
return CompletableFuture.supplyAsync {
sleep(1)
if (input == 7) {
throw RuntimeException("Input was 7")
}
input % 3
}.get()
} catch (ex: Exception) {
throw ex.cause!!
}
}
谁能解释一下为什么人们应该使用协程?是否有一些协程代码示例显示比常规 java 并发代码 更好的完成时间(没有神奇的 delay() 函数,没有人在生产中使用 delay()
)?
在我个人的示例中,协程(第 1 行)与 java 代码(第 2 行)相比很糟糕。也许我做错了什么?
示例:
import kotlinx.coroutines.*
import java.time.Instant
import java.util.concurrent.CompletableFuture
import java.util.concurrent.Future
@ExperimentalCoroutinesApi
fun main() = runBlocking {
val begin = Instant.now().toEpochMilli()
val jobs = List(150_000) {
GlobalScope.launch { print(getText().await()) } // :1
// CompletableFuture.supplyAsync { "." }.thenAccept { print(it) } // :2
}
jobs.forEach { it.join() }
println(Instant.now().toEpochMilli() - begin)
}
fun getText(): Future<String> {
return CompletableFuture.supplyAsync {
"."
}
}
@ExperimentalCoroutinesApi
suspend fun <T> Future<T>.await(): T = suspendCancellableCoroutine { cont ->
cont.resume(this.get()) {
this.cancel(true)
}
}
附加问题:
为什么我应该创建这个协程包装器 await()
?似乎没有改进代码的协程版本,否则 get()
方法在 inappropriate blocking method call
?
协程的目标不是 "better completion time." 目标 - 老实说,它取得了相当大的成功 - 是协程 更易于使用 。
也就是说,您在代码中所做的根本不是比较两种替代方法速度的好方法。比较 Java 中事物的速度并获得真实的结果 非常困难 ,在尝试之前您应该至少阅读 How do I write a correct micro-benchmark in Java?。您当前尝试比较两段 Java 代码的方式将 欺骗您 关于代码的实际性能行为。
要回答您的附加问题,答案是您不应该 创建那个 await
方法。您不应该将 get()
— 或 java.util.concurrent.Future
— 与协程代码一起使用,无论它是在 suspendCancellableCoroutine
中还是其他地方。如果您想使用 CompletableFuture
,请使用 the provided library 从协程代码中与其交互。
切换到这个 kotlinx-coroutines-jdk8 库并将 sleep(1) 添加到我的 getText()
函数后
import kotlinx.coroutines.GlobalScope
import kotlinx.coroutines.future.await
import kotlinx.coroutines.launch
import kotlinx.coroutines.runBlocking
import java.lang.Thread.sleep
import java.time.Instant
import java.util.concurrent.CompletableFuture
fun main() = runBlocking {
val begin = Instant.now().toEpochMilli()
val jobs = List(150_000) {
GlobalScope.launch { print(getText().await()) } // :1
// getText().thenAccept { print(it) } // :2
}
jobs.forEach { it.join() }
println(Instant.now().toEpochMilli() - begin)
}
fun getText(): CompletableFuture<String> {
return CompletableFuture.supplyAsync {
sleep(1)
"."
}
}
我使协程版本比 java 版本更快!!!显然,当有一些延迟时,这个额外的协程层变得合理。
这是我用于基准测试的代码的清理版本。请注意,我从测量代码中删除了 print
,因为打印本身是一个重量级操作,涉及互斥体、JNI、阻塞输出流等。相反,我更新了一个 volatile 变量。
import kotlinx.coroutines.ExperimentalCoroutinesApi
import kotlinx.coroutines.GlobalScope
import kotlinx.coroutines.future.await
import kotlinx.coroutines.launch
import kotlinx.coroutines.runBlocking
import java.lang.Thread.sleep
import java.util.concurrent.CompletableFuture
import java.util.concurrent.CompletionStage
import java.util.concurrent.TimeUnit.NANOSECONDS
@Volatile
var total = 0
@ExperimentalCoroutinesApi
fun main() = runBlocking {
println("Warmup")
measure(20_000)
println("Measure")
val begin = System.nanoTime()
measure(40_000)
println("Completed in ${NANOSECONDS.toMillis(System.nanoTime() - begin)} ms")
}
fun getText(): CompletableFuture<Int> {
return CompletableFuture.supplyAsync {
sleep(1)
1
}
}
suspend fun measure(count: Int) {
val jobs = List(count) {
GlobalScope.launch { total += getText().await() } // :1
// getText().thenAccept { total += it } // :2
}
jobs.forEach { it.join() }
}
我的结果是第一个案例为 6.5 秒,而第二个案例为 7 秒。这是一个 7% 的差异,它可能非常特定于这个确切的场景,而不是您通常认为两种方法之间的差异。
选择协程而不是基于 CompletionStage
的编程的原因绝对不是那 7%,而是关于便利性的巨大差异。为了明白我的意思,我邀请您重写 main
函数,只调用 computeAsync
,而不使用 future.await()
:
suspend fun main() {
try {
if (compute(1) == 2) {
println(compute(4))
} else {
println(compute(7))
}
} catch (e: RuntimeException) {
println("Got an error")
println(compute(8))
}
}
fun main_no_coroutines() {
// Let's see how it might look!
}
fun computeAsync(input: Int): CompletableFuture<Int> {
return CompletableFuture.supplyAsync {
sleep(1)
if (input == 7) {
throw RuntimeException("Input was 7")
}
input % 3
}
}
suspend fun compute(input: Int) = computeAsync(input).await()
我的 compute
方法的 2 个版本没有重写方法签名。我想我明白你的意思了。使用协程,我们可以以熟悉的顺序风格编写复杂的并行代码。但是协程 await
包装器由于暂停技术而无法使这项工作,它只是实现了与我所做的相同的逻辑。
import java.lang.Thread.sleep
import java.util.concurrent.CompletableFuture
fun main() {
try {
if (compute(1) == 2) {
println(compute(4))
} else {
println(compute(7))
}
} catch (e: RuntimeException) {
println("Got an error")
println(compute(8))
}
}
fun compute(input: Int): Int {
var exception: Throwable? = null
val supplyAsync = CompletableFuture.supplyAsync {
sleep(1)
if (input == 7) {
throw RuntimeException("Input was 7")
}
input % 3
}.exceptionally {
exception = it
throw it
}
while (supplyAsync.isDone.not()) {}
return if (supplyAsync.isCompletedExceptionally) {
throw exception!!
} else supplyAsync.get()
}
fun compute2(input: Int): Int {
try {
return CompletableFuture.supplyAsync {
sleep(1)
if (input == 7) {
throw RuntimeException("Input was 7")
}
input % 3
}.get()
} catch (ex: Exception) {
throw ex.cause!!
}
}