Kotlin 协程 vs CompletableFuture

Kotlin Coroutines vs CompletableFuture

谁能解释一下为什么人们应该使用协程?是否有一些协程代码示例显示比常规 java 并发代码 更好的完成时间(没有神奇的 delay() 函数,没有人在生产中使用 delay()

在我个人的示例中,协程(第 1 行)与 java 代码(第 2 行)相比很糟糕。也许我做错了什么?

示例:

import kotlinx.coroutines.*
import java.time.Instant
import java.util.concurrent.CompletableFuture
import java.util.concurrent.Future

@ExperimentalCoroutinesApi
fun main() = runBlocking {
    val begin = Instant.now().toEpochMilli()
    val jobs = List(150_000) {
        GlobalScope.launch { print(getText().await()) } // :1
//        CompletableFuture.supplyAsync { "." }.thenAccept { print(it) } // :2
    }
    jobs.forEach { it.join() }
    println(Instant.now().toEpochMilli() - begin)
}

fun getText(): Future<String> {
    return CompletableFuture.supplyAsync {
        "."
    }
}

@ExperimentalCoroutinesApi
suspend fun <T> Future<T>.await(): T = suspendCancellableCoroutine { cont ->
    cont.resume(this.get()) {
        this.cancel(true)
    }
}

附加问题:

为什么我应该创建这个协程包装器 await()?似乎没有改进代码的协程版本,否则 get() 方法在 inappropriate blocking method call?

上抱怨

协程的目标不是 "better completion time." 目标 - 老实说,它取得了相当大的成功 - 是协程 更易于使用

也就是说,您在代码中所做的根本不是比较两种替代方法速度的好方法。比较 Java 中事物的速度并获得真实的结果 非常困难 ,在尝试之前您应该至少阅读 How do I write a correct micro-benchmark in Java?。您当前尝试比较两段 Java 代码的方式将 欺骗您 关于代码的实际性能行为。

要回答您的附加问题,答案是您不应该 创建那个 await 方法。您不应该将 get() — 或 java.util.concurrent.Future — 与协程代码一起使用,无论它是在 suspendCancellableCoroutine 中还是其他地方。如果您想使用 CompletableFuture,请使用 the provided library 从协程代码中与其交互。

切换到这个 kotlinx-coroutines-jdk8 库并将 sleep(1) 添加到我的 getText() 函数后

import kotlinx.coroutines.GlobalScope
import kotlinx.coroutines.future.await
import kotlinx.coroutines.launch
import kotlinx.coroutines.runBlocking
import java.lang.Thread.sleep
import java.time.Instant
import java.util.concurrent.CompletableFuture

fun main() = runBlocking {
    val begin = Instant.now().toEpochMilli()
    val jobs = List(150_000) {
        GlobalScope.launch { print(getText().await()) } // :1
//        getText().thenAccept { print(it) } // :2
    }
    jobs.forEach { it.join() }
    println(Instant.now().toEpochMilli() - begin)
}

fun getText(): CompletableFuture<String> {
    return CompletableFuture.supplyAsync {
        sleep(1)
        "."
    }
}

我使协程版本比 java 版本更快!!!显然,当有一些延迟时,这个额外的协程层变得合理。

这是我用于基准测试的代码的清理版本。请注意,我从测量代码中删除了 print,因为打印本身是一个重量级操作,涉及互斥体、JNI、阻塞输出流等。相反,我更新了一个 volatile 变量。

import kotlinx.coroutines.ExperimentalCoroutinesApi
import kotlinx.coroutines.GlobalScope
import kotlinx.coroutines.future.await
import kotlinx.coroutines.launch
import kotlinx.coroutines.runBlocking
import java.lang.Thread.sleep
import java.util.concurrent.CompletableFuture
import java.util.concurrent.CompletionStage
import java.util.concurrent.TimeUnit.NANOSECONDS

@Volatile
var total = 0

@ExperimentalCoroutinesApi
fun main() = runBlocking {
    println("Warmup")
    measure(20_000)
    println("Measure")
    val begin = System.nanoTime()
    measure(40_000)
    println("Completed in ${NANOSECONDS.toMillis(System.nanoTime() - begin)} ms")
}

fun getText(): CompletableFuture<Int> {
    return CompletableFuture.supplyAsync {
        sleep(1)
        1
    }
}

suspend fun measure(count: Int) {
    val jobs = List(count) {
        GlobalScope.launch { total += getText().await() } // :1
//        getText().thenAccept { total += it } // :2
    }
    jobs.forEach { it.join() }
}

我的结果是第一个案例为 6.5 秒,而第二个案例为 7 秒。这是一个 7% 的差异,它可能非常特定于这个确切的场景,而不是您通常认为两种方法之间的差异。

选择协程而不是基于 CompletionStage 的编程的原因绝对不是那 7%,而是关于便利性的巨大差异。为了明白我的意思,我邀请您重写 main 函数,只调用 computeAsync,而不使用 future.await():

suspend fun main() {
    try {
        if (compute(1) == 2) {
            println(compute(4))
        } else {
            println(compute(7))
        }
    } catch (e: RuntimeException) {
        println("Got an error")
        println(compute(8))
    }
}

fun main_no_coroutines() {
    // Let's see how it might look!
}

fun computeAsync(input: Int): CompletableFuture<Int> {
    return CompletableFuture.supplyAsync {
        sleep(1)
        if (input == 7) {
            throw RuntimeException("Input was 7")
        }
        input % 3
    }
}

suspend fun compute(input: Int) = computeAsync(input).await()

我的 compute 方法的 2 个版本没有重写方法签名。我想我明白你的意思了。使用协程,我们可以以熟悉的顺序风格编写复杂的并行代码。但是协程 await 包装器由于暂停技术而无法使这项工作,它只是实现了与我所做的相同的逻辑。

import java.lang.Thread.sleep
import java.util.concurrent.CompletableFuture

fun main() {
    try {
        if (compute(1) == 2) {
            println(compute(4))
        } else {
            println(compute(7))
        }
    } catch (e: RuntimeException) {
        println("Got an error")
        println(compute(8))
    }
}

fun compute(input: Int): Int {
    var exception: Throwable? = null
    val supplyAsync = CompletableFuture.supplyAsync {
        sleep(1)
        if (input == 7) {
            throw RuntimeException("Input was 7")
        }
        input % 3
    }.exceptionally {
        exception = it
        throw it
    }
    while (supplyAsync.isDone.not()) {}
    return if (supplyAsync.isCompletedExceptionally) {
        throw exception!!
    } else supplyAsync.get()
}

fun compute2(input: Int): Int {
    try {
        return CompletableFuture.supplyAsync {
            sleep(1)
            if (input == 7) {
                throw RuntimeException("Input was 7")
            }
            input % 3
        }.get()
    } catch (ex: Exception) {
        throw ex.cause!!
    }
}