Kotlin:阻塞协程与非阻塞 I/O
Kotlin: Blocking coroutines with non-blocking I/O
我正在尝试使用 Kotlin 协程来处理非阻塞 I/O。场景如下:
- 从线程 1 上的异步回调 运行 接收数据。
- 在线程 2 中等待此数据然后使用它。
我当前的代码如下所示(为简洁起见进行了简化):
private var latch = CountDownLatch(1)
private var data: Any? = null
// Async callback from non-blocking I/O
fun onReceive(data: Any) {
currentData = data
latch.countDown()
}
// Wait and consume data
fun getData(): Any? {
latch.await()
latch = CountDownLatch(1)
return currentData
}
fun processData() {
launch(CommonPool) {
while (true) {
val data = getData()
// Consume data
}
}
}
据我了解,Kotlin协程应该可以帮助我摆脱CountDownLatch。看完this (awesome) guide,我能想到的是这样的:
// Wait and consume data
fun getData() = async(CommonPool) {
latch.await()
latch = CountDownLatch(1)
currentData
}
fun processData() {
launch(CommonPool) {
while (true) {
runBlocking {
val data = getData().await()
// Consume data
}
}
}
}
我也尝试过 Pipelines,结果相似。我显然不明白如何使用这些功能。
你没有说在 onReceive()
中收到的数据是否可以并行处理。这是主要问题。如果是,您可以在 onReceive()
中完成。如果不允许,则让对 onReceive()
的每次调用都在 CommonPool
上启动一个任务,而无需任何协程。如果他们应该按顺序处理,那么最简单的方法是启动一个内部循环的线程:
fun onReceive(data: Any) {
queue.put(data);
}
....
// loop in a thread
while(true) {
data = queue.take();
processData(data);
}
同样,不需要协程。
通常,协程是语法糖,用于将异步程序表示为同步程序。我不认为你的程序是使用协程的案例。
在使用 Android 进行开发时使用 CountDownLatch
是一种非常常见的模式,有时您希望在处理 BroadcastReceivers
和 CountDownLatch
时使异步实现同步] 非常方便。
private suspend fun yourSuspendMethod() {
val job = GlobalScope.async {
val latch = CountDownLatch(1)
val watcher = object : BroadcastReceiver() {
override fun onReceive(context: Context?, intent: Intent?) {
if // your logic
latch.countDown()
}
}
try {
mContext?.registerReceiver(watcher, IntentFilter(...))
//call a method that will trigger the broadcast receiver
if (!latch.await(5, TimeUnit.SECONDS)) {
throw Exception("Failed .... on latch's timeout")
}
} finally {
mContext?.unregisterReceiver(watcher)
}
}
job.await()
}
这里有一件非常重要的事情,不要更改 CoroutineScope 的上下文,否则它们将 运行 在一个完全不同的地方,按照我在上面留下的方式成为scope/context.
[编辑]
我决定对这个问题多加思考,避免使用 CountDownLatch
。闩锁的问题是,当您调用 latch.await
时,它会停止当前线程,因此如果这是来自主线程,则主线程将等待并超时,因为它没有给要调用的接收器。解决这个问题的一种方法是使用我上面使用的示例。
我在上面的例子中忘记的一件事是,如果你想单元测试和同步调用者的上下文,你需要注入上下文。如果您决定这样做,您的实现将变得更加复杂,并且您将无法使用协程的全部功能,因为您将创建额外的线程。
因此,解决方案是使用 withTimeout
+ suspendCancellableCoroutine
的组合,您可以使用此扩展名:
suspend inline fun <T> suspendCoroutineWithTimeout(
timeout: Long,
crossinline block: (Continuation<T>) -> Unit
) = withTimeout(timeout) {
suspendCancellableCoroutine(block = block)
}
您的方法将如下所示:
private suspend fun yourSuspendMethod() {
var watcher: BroadcastReceiver? = null
try {
suspendCoroutineWithTimeout<Boolean>(TimeUnit.SECONDS.toMillis(5)) {
watcher = object : BroadcastReceiver() {
override fun onReceive(context: Context?, intent: Intent?) {
if // your logic
it.resume(true)
}
}
context?.registerReceiver(watcher, IntentFilter(...))
//call a method that will trigger the broadcast receiver
}
} finally {
context?.unregisterReceiver(watcher)
}
}
就是这样。现在协程可以在不停止调用者线程的情况下发挥它的魔力,当作业被取消时,超时也会取消。
我正在尝试使用 Kotlin 协程来处理非阻塞 I/O。场景如下:
- 从线程 1 上的异步回调 运行 接收数据。
- 在线程 2 中等待此数据然后使用它。
我当前的代码如下所示(为简洁起见进行了简化):
private var latch = CountDownLatch(1)
private var data: Any? = null
// Async callback from non-blocking I/O
fun onReceive(data: Any) {
currentData = data
latch.countDown()
}
// Wait and consume data
fun getData(): Any? {
latch.await()
latch = CountDownLatch(1)
return currentData
}
fun processData() {
launch(CommonPool) {
while (true) {
val data = getData()
// Consume data
}
}
}
据我了解,Kotlin协程应该可以帮助我摆脱CountDownLatch。看完this (awesome) guide,我能想到的是这样的:
// Wait and consume data
fun getData() = async(CommonPool) {
latch.await()
latch = CountDownLatch(1)
currentData
}
fun processData() {
launch(CommonPool) {
while (true) {
runBlocking {
val data = getData().await()
// Consume data
}
}
}
}
我也尝试过 Pipelines,结果相似。我显然不明白如何使用这些功能。
你没有说在 onReceive()
中收到的数据是否可以并行处理。这是主要问题。如果是,您可以在 onReceive()
中完成。如果不允许,则让对 onReceive()
的每次调用都在 CommonPool
上启动一个任务,而无需任何协程。如果他们应该按顺序处理,那么最简单的方法是启动一个内部循环的线程:
fun onReceive(data: Any) {
queue.put(data);
}
....
// loop in a thread
while(true) {
data = queue.take();
processData(data);
}
同样,不需要协程。
通常,协程是语法糖,用于将异步程序表示为同步程序。我不认为你的程序是使用协程的案例。
在使用 Android 进行开发时使用 CountDownLatch
是一种非常常见的模式,有时您希望在处理 BroadcastReceivers
和 CountDownLatch
时使异步实现同步] 非常方便。
private suspend fun yourSuspendMethod() {
val job = GlobalScope.async {
val latch = CountDownLatch(1)
val watcher = object : BroadcastReceiver() {
override fun onReceive(context: Context?, intent: Intent?) {
if // your logic
latch.countDown()
}
}
try {
mContext?.registerReceiver(watcher, IntentFilter(...))
//call a method that will trigger the broadcast receiver
if (!latch.await(5, TimeUnit.SECONDS)) {
throw Exception("Failed .... on latch's timeout")
}
} finally {
mContext?.unregisterReceiver(watcher)
}
}
job.await()
}
这里有一件非常重要的事情,不要更改 CoroutineScope 的上下文,否则它们将 运行 在一个完全不同的地方,按照我在上面留下的方式成为scope/context.
[编辑]
我决定对这个问题多加思考,避免使用 CountDownLatch
。闩锁的问题是,当您调用 latch.await
时,它会停止当前线程,因此如果这是来自主线程,则主线程将等待并超时,因为它没有给要调用的接收器。解决这个问题的一种方法是使用我上面使用的示例。
我在上面的例子中忘记的一件事是,如果你想单元测试和同步调用者的上下文,你需要注入上下文。如果您决定这样做,您的实现将变得更加复杂,并且您将无法使用协程的全部功能,因为您将创建额外的线程。
因此,解决方案是使用 withTimeout
+ suspendCancellableCoroutine
的组合,您可以使用此扩展名:
suspend inline fun <T> suspendCoroutineWithTimeout(
timeout: Long,
crossinline block: (Continuation<T>) -> Unit
) = withTimeout(timeout) {
suspendCancellableCoroutine(block = block)
}
您的方法将如下所示:
private suspend fun yourSuspendMethod() {
var watcher: BroadcastReceiver? = null
try {
suspendCoroutineWithTimeout<Boolean>(TimeUnit.SECONDS.toMillis(5)) {
watcher = object : BroadcastReceiver() {
override fun onReceive(context: Context?, intent: Intent?) {
if // your logic
it.resume(true)
}
}
context?.registerReceiver(watcher, IntentFilter(...))
//call a method that will trigger the broadcast receiver
}
} finally {
context?.unregisterReceiver(watcher)
}
}
就是这样。现在协程可以在不停止调用者线程的情况下发挥它的魔力,当作业被取消时,超时也会取消。