将数据从回调推送到 Kotlin 协程的方法是什么

What is the way to push data from a callback to a Kotlin coroutine

假设我想在 DownloadQueue 之类的示例中使用缓存。 协程的乐趣在于可以在单线程算法中使用简单的数据结构(例如 HashMap)。

但是我想使用有限的缓存,并将额外的条目逐出到较慢的持久存储中。 例如,我可以取 Guava + CacheBuilder + RemovalListener: https://github.com/google/guava/wiki/CachesExplained#removal-listeners

这里有一个问题:RemovalListener 是一个非挂起的回调,因此无法将数据推送到 Channels 或来自回调的任何内容。

// This is placed in a suspend context (e.g. within `actor`)
val cache = CacheBuilder.newBuilder().maximumSize(1000)
    .expireAfterWrite(10, TimeUnit.SECONDS)
    .removalListener<Key, Value> { n ->
        expireChannel.sendBlocking(n.value!!) // <-- could sendBlocking be used here???
    }.build<Key, Value>()

据我了解,runBlocking 阻塞了当前线程的执行,因此它阻塞了当前协程,并且无法 receive/handle 数据。

有没有办法使 RemovalListener 类回调协程友好?

我可以想象一种解决方法,即使用由 RemovalListener 填充的无界队列,而相关协程最终会检查队列。

不阻塞删除侦听器(从而破坏缓存)的唯一方法是拥有一个缓冲通道。这意味着两个变化:

  1. 创建具有非零容量的 SendChannel,请参阅 Channel(capacity) 以考虑不同的行为。这主要取决于如果过期的商品超过消费者可以处理的数量,应该采取什么行为。
  2. 使用 SendChannel.offer() 而不是 SendChannel.sendBlocking()。如果它 returns false 你可以实现备份行为,比如记录失败,这样你就可以找出内存和丢失事件之间的最佳平衡。