Kotlin 协程比 RxKotlin 好在哪里?

How Kotlin coroutines are better than RxKotlin?

我为什么要使用 Kotlin 的协程?

RxKotlin 库似乎更加通用。 相比之下,Kotlin 的协程看起来功能明显更弱,使用起来也更麻烦。

我对协程的看法基于this design talk by Andrey Breslav (JetBrains)

演讲中的幻灯片是accessible here.


编辑(感谢@hotkey):

有关协程当前状态的更好来源here.

免责声明: 该答案的部分内容无关紧要,因为协程现在具有流程 API,与 Rx 非常相似。如果您想要 up-to-date 答案,请跳至上次编辑。

Rx中有两部分; Observable 模式,以及一组可靠的操作符来操作、转换和组合它们。 Observable 模式本身并没有做很多事情。与协程相同;这只是处理异步的另一种范例。你可以比较回调、Observable 和协同程序解决给定问题的 pro/cons,但你不能将范式与功能齐全的库进行比较。这就像将语言与框架进行比较。

Kotlin 协程比 RxKotlin 好在哪里?还没有使用协程,但它看起来类似于 C# 中的 async/wait。您只需编写顺序代码,一切都和编写同步代码一样简单……除了它是异步执行的。更容易掌握。

我为什么要使用 kotlin 协程?我会自己回答。大多数时候我会坚持使用 Rx,因为我喜欢 event-driven 架构。但是如果出现我正在编写顺序代码的情况,并且我需要在中间调用一个异步方法,我会很高兴地利用协程来保持这种状态并避免将所有内容包装在 Observable 中。

编辑:现在我正在使用协程,是时候进行更新了。

RxKotlin 只是在 Kotlin 中使用 RxJava 的语法糖,所以下面我将讨论 RxJava 而不是 RxKotlin。 Coroutines 是比 RxJava 更低级和更通用的概念,它们为其他人服务 use-cases。也就是说,有一个 use-case 可以比较 RxJava 和协同程序 (channel),它异步传递数据。 Coroutines 在这方面比 RxJava 有明显的优势:

协程更好地处理资源

  • 在 RxJava 中,您可以将计算分配给调度程序,但 subscribeOn()ObserveOn() 令人困惑。每个协程都被赋予一个线程上下文和 return 父上下文。对于一个通道,双方(生产者、消费者)都在自己的上下文中执行。协程在线程或线程池影响方面更直观。
  • 协同程序可以更好地控制这些计算何时发生。例如,您可以传递手(yield)、优先级(select)、并行化(channel 上的多个 producer/actor)或锁定资源(Mutex) 对于给定的计算。在服务器上(RxJava 首先出现的地方)可能无关紧要,但在资源有限的环境中,可能需要这种级别的控制。
  • 由于它的反应性质,背压不适合 RxJava。在另一端 send() 到 channel 是一个暂停功能,当达到频道容量时暂停。这是自然给予的 out-of-the-box 背压。你也可以 offer() 到通道,在这种情况下调用永远不会挂起但是 return false 以防通道已满,有效地从 RxJava 复制 onBackpressureDrop() 。或者您可以只编写自己的自定义反压逻辑,这对于协程来说并不困难,尤其是与使用 RxJava 进行相同操作相比。

还有另一个 use-case,协程大放异彩,这将回答您的第二个问题“我为什么要使用 Kotlin 协程?”。协程是后台线程或 AsyncTask (Android) 的完美替代品。就像 launch { someBlockingFunction() } 一样简单。当然你也可以用 RxJava 实现这个,也许使用 SchedulersCompletable。您不会(或很少)使用观察者模式和作为 RxJava 签名的运算符,这暗示这项工作超出了 RxJava 的范围。 RxJava 的复杂性(这里是一种无用的税收)将使您的代码比 Coroutine 的版本更冗长和更不干净。

可读性很重要。在这方面,RxJava 和协程的做法有很大不同。协程比 RxJava 更简单。如果您对 map()flatmap() 和一般的函数式反应式编程不放心,协程操作更容易,涉及基本指令:foriftry/catch ... 但我个人觉得协程的代码对于 non-trivial 任务来说更难理解。特别是它涉及更多的嵌套和缩进,而 RxJava 中的运算符链接使一切保持一致。 Functional-style 编程使处理更明确。最重要的是,RxJava 可以使用其丰富(好吧,太丰富了)运算符集中的一些标准运算符来解决复杂的转换。当您有需要大量组合和转换的复杂数据流时,RxJava 会大放异彩。

我希望这些注意事项能帮助您根据需要选择合适的工具。

编辑: 协程现在有流程,API 非常非常类似于 Rx。可以比较每个 pro/cons,但事实是差异很小。

协程作为其核心是一种并发设计模式,具有 add-on 库,其中一个是类似于 Rx 的流 API。显然,Coroutines 的范围比 Rx 广泛得多,Coro 有很多东西Rx 不能的 tines 可以,我不能一一列举。但通常如果我在我的项目之一中使用协同程序,归结为一个原因:

协程更擅长从您的代码中删除回调

我避免使用太过损害可读性的回调。协程使异步代码变得简单易写。通过利用 suspend 关键字,您的代码看起来像同步代码。

我看到项目中使用 Rx 主要是为了替代回调的相同目的,但如果你不打算修改你的体系结构以提交到反应模式,Rx 将是一个负担。考虑这个界面:

interface Foo {
   fun bar(callback: Callback)
}

协程等价物更明确,具有 return 类型和关键字 suspend 表示它是异步操作。

interface Foo {
   suspend fun bar: Result
}

但是 Rx 等价物有问题:

interface Foo {
   fun bar: Single<Result>
}

当您在回调或协程版本中调用 bar() 时,您会触发计算;使用 Rx 版本,您可以获得可以随意触发的计算表示。您需要调用 bar() 然后订阅 Single。通常没什么大不了的,但对于初学者来说有点混乱,可能会导致微妙的问题。

此类问题的一个例子,假设回调条函数是这样实现的:

fun bar(callback: Callback) {
   setCallback(callback)
   refreshData()
}

如果移植不当,您将以只能触发一次的 Single 结束,因为 refreshData() 是在 bar() 函数中调用的,而不是在订阅时调用的。一个初学者的错误,当然,但事实是 Rx 远不止是回调替换,许多开发人员都在努力掌握 Rx。

如果您的 objective 是将异步任务从回调转换为更好的范例,Coroutines 是完美的选择,而 Rx 增加了一些复杂性。

Kotlin 协程与 Rx 不同。很难将它们进行同类比较,因为 Kotlin 协程是一种精简的语言特性(只有几个基本概念和一些基本函数来操作它们),而 Rx 是一个相当繁重的库,具有相当多的种类即用型运算符。两者都是为了解决异步编程的问题而设计的,但是它们的解决方法却大不相同:

  • Rx 带有一种特殊的函数式编程风格,几乎可以在任何编程语言中实现,而无需语言本身的支持。当手头的问题很容易分解为一系列标准运算符时效果很好,否则效果不佳。

  • Kotlin 协程提供了一种语言功能,让库编写者可以实现各种异步编程风格,包括但不限于函数式响应式风格 (Rx)。使用 Kotlin 协程,您还可以以命令式、promise/futures-based 式、actor 式等方式编写异步代码。

将Rx与一些基于Kotlin协程实现的特定库进行比较更合适。

kotlinx.coroutines library as one example. This library provides a set of primitives like async/await and channels that are typically baked into other programming languages. It also has support for light-weight future-less actors. You can read more in the Guide to kotlinx.coroutines by example.

kotlinx.coroutines 提供的频道可以在某些用例中替换或增强 Rx。有一个单独的 Guide to reactive streams with coroutines 更深入地探讨了与 Rx 的相同点和不同点。

您链接的 talk/doc 没有谈论频道。通道填补了您当前对协程和事件驱动编程的理解之间的差距。

使用协程和通道,您可以进行事件驱动编程,就像您可能习惯于使用 rx 一样,但是您可以使用看起来同步的代码来完成,而无需那么多 "custom" 运算符。

如果您想更好地理解这一点,我建议您看看 kotlin 之外的内容,那里的概念更加成熟和完善(不是实验性的)。查看 core.async 来自 Clojure、Rich Hickey 的视频、帖子和相关讨论。

协程旨在提供一个轻量级的异步编程框架。就启动异步作业所需的资源而言,它是轻量级的。协程不强制使用外部 API 并且对用户(程序员)来说更自然。相比之下,RxJava + RxKotlin 有一个额外的数据处理包,在 Kotlin 中并不是真正需要的,它在标准库中有非常丰富的 API 用于 Sequences 和 Collections 处理。

如果你想在 Android 上看到更多关于协程的实际使用,我可以推荐我的文章: https://www.netguru.com/codestories/android-coroutines-%EF%B8%8Fin-2020

我非常了解 RxJava,最近我转向了 Kotlin Coroutines 和 Flow。

RxKotlin 与 RxJava 基本相同,只是添加了一些语法糖,使在 Kotlin 中编写 RxJava 代码更加舒适/地道。

RxJava 和 Kotlin 协程之间的“公平”比较应该包括 Flow,我将尝试在这里解释原因。这会有点长,但我会尽量通过示例使其尽可能简单。

使用 RxJava 你有不同的对象(从版本 2 开始):

// 0-n events without backpressure management
fun observeEventsA(): Observable<String>

// 0-n events with explicit backpressure management
fun observeEventsB(): Flowable<String>

// exactly 1 event
fun encrypt(original: String): Single<String>

// 0-1 events
fun cached(key: String): Maybe<MyData>

// just completes with no specific results
fun syncPending(): Completable

在 kotlin 协程 + 流中你不需要很多实体因为如果你没有事件流你可以只使用简单的协程(挂起函数):

// 0-n events, the backpressure is automatically taken care off
fun observeEvents(): Flow<String>

// exactly 1 event
suspend fun encrypt(original: String): String

// 0-1 events
suspend fun cached(key: String): MyData?

// just completes with no specific results
suspend fun syncPending()

奖励:Kotlin Flow / Coroutines 支持 null 值(RxJava 2 移除了支持)

暂停函数顾名思义:它们是可以暂停代码执行并在函数完成后恢复的函数;这使您可以编写感觉更自然的代码。

运算符呢?

在 RxJava 中,你有很多操作符(mapfilterflatMapswitchMap、...),并且对于大多数操作符都有一个版本对于每个实体类型(Single.map()Observable.map()、...)。

Kotlin Coroutines + Flow 不需要那么多运算符,让我们通过一些最常见运算符的例子来看看为什么

地图()

RxJava:

fun getPerson(id: String): Single<Person>
fun observePersons(): Observable<Person>

fun getPersonName(id: String): Single<String> {
  return getPerson(id)
     .map { it.firstName }
}

fun observePersonsNames(): Observable<String> {
  return observePersons()
     .map { it.firstName }
}

Kotlin 协程 + Flow

suspend fun getPerson(id: String): Person
fun observePersons(): Flow<Person>

suspend fun getPersonName(id: String): String? {
  return getPerson(id).firstName
}

fun observePersonsNames(): Flow<String> {
  return observePersons()
     .map { it.firstName }
}

“单一”情况不需要运算符,Flow 情况非常相似。

flatMap()

flatMap 运算符和他的同胞 switchMapcontactMap 的存在是为了允许您组合不同的 RxJava 对象,从而在映射您的事件时执行潜在的异步代码。

假设您需要为每个人从数据库(或远程服务)中获取保险

RxJava

fun fetchInsurance(insuranceId: String): Single<Insurance>

fun getPersonInsurance(id: String): Single<Insurance> {
  return getPerson(id)
    .flatMap { person ->
      fetchInsurance(person.insuranceId)
    }
}

fun observePersonsInsurances(): Observable<Insurance> {
  return observePersons()
    .flatMap { person ->
      fetchInsurance(person.insuranceId) // this is a Single
          .toObservable() // flatMap expect an Observable
    }
}

让我们看看 Kotlin Coroutiens + Flow

suspend fun fetchInsurance(insuranceId: String): Insurance

suspend fun getPersonInsurance(id: String): Insurance {
  val person = getPerson(id)
  return fetchInsurance(person.insuranceId)
}

fun observePersonsInsurances(): Flow<Insurance> {
  return observePersons()
    .map { person ->
      fetchInsurance(person.insuranceId)
    }
}

像以前一样,对于简单的协程情况,我们不需要运算符,我们只需编写代码,如果它不是异步的,只需使用挂起函数。

并且 Flow 不是拼写错误,不需要 flatMap 运算符,我们可以使用 map。原因是 map lambda 是一个暂停函数!我们可以在里面执行挂起代码!!!

为此我们不需要其他操作员。

I cheated a bit here

Rx flatMap, switchMap and concatMap behave slightly differently. Rx flatMap generate a new stream for each event and than merge them all together: the order of the new streams events you receive in the output is undetermined, it might not match the order or the events in input

Rx concatMap "fixes" that and guarantee you will get each new stream in the same order of your input events

Rx switchMap will instead dispose of any previously running stream when it gets a new events, only the last input received matter with this operator

So you see, it isn't true that Flow.map is the same, it is actually more similar to Rx concatMap, which is the more natural behavior you expect from a map operator.

But it is true you need less operators, inside map you can do any async operation you want and reproduce the behavior of flatMap because it is a suspendable function. The actual equivalent operator to RxJava flatMap is Flow.flatMapMerge operator.

The equivalent of the RxJava switchMap can be achieved in Flow by using the conflate() operator before the map operator.

对于更复杂的内容,您可以使用 Flow transform() 运算符,它会为每个事件发出您选择的 Flow。

每个流算子都接受一个挂起函数!

在上一段我告诉你我作弊了。但关键是我所说的 Flow 不需要那么多操作符 是大多数操作符的回调都是挂起函数。

所以说你需要 filter() 但你的过滤器需要执行网络调用来知道你是否应该保留该值,使用 RxJava 你需要将多个运算符与不可读的代码结合起来,使用 Flow 你可以只使用 filter()!

fun observePersonsWithValidInsurance(): Flow<Person> {
  return observerPersons()
    .filter { person ->
        val insurance = fetchInsurance(person.insuranceId) // suspending call
        insurance.isValid()
    }
}

delay(), startWith(), concatWith(), ...

在 RxJava 中,您有许多用于应用延迟或在前后添加项目的运算符:

  • 延迟()
  • 延迟订阅()
  • 开始(T)
  • startWith(可观察)
  • concatWith(...)

使用 kotlin Flow,您可以简单地:

grabMyFlow()
  .onStart {
    // delay by 3 seconds before starting
    delay(3000L)
    // just emitting an item first
    emit("First item!")
    emit(cachedItem()) // call another suspending function and emit the result
  }
  .onEach { value ->
    // insert a delay of 1 second after a value only on some condition
    if (value.length() > 5) {
      delay(1000L)
    }
  }
  .onCompletion {
    val endingSequence: Flow<String> = grabEndingSequence()
    emitAll(endingSequence)
  }

错误处理

RxJava 有很多运算符来处理错误:

  • onErrorResumeWith()
  • onErrorReturn()
  • onErrorComplete()

有了 Flow,你只需要运算符 catch():

  grabMyFlow()
    .catch { error ->
       // emit something from the flow
       emit("We got an error: $error.message")
       // then if we can recover from this error emit it
       if (error is RecoverableError) {
          // error.recover() here is supposed to return a Flow<> to recover
          emitAll(error.recover())
       } else {
          // re-throw the error if we can't recover (aka = don't catch it)
          throw error
       }
    }

而且有了挂起功能就可以用try {} catch() {}.

您可以使用一个 catch 运算符实现所有 RxJava 错误运算符,因为您获得了一个挂起函数。

易于编写流操作符

由于协程在后台为 Flow 提供动力,因此编写运算符要容易得多。如果你曾经检查过 RxJava 运算符,你会发现它有多难以及你需要学习多少东西。

编写 Kotlin Flow 运算符更容易,您只需查看已经包含在 Flow 中的运算符的源代码就可以了解 here。原因是协同程序使编写异步代码变得更容易,而且运算符使用起来感觉更自然。

作为奖励,Flow operators 都是 kotlin Extension Functions,这意味着无论是你,还是库,都可以很容易地添加 operators 并且它们使用起来不会感到奇怪(在 RxJava observable.lift()observable.compose() 需要组合自定义运算符)。

上游线程不泄漏下游

今天晚上做什么意思是?

这解释了为什么在 RxJava 中你有 subscribeOn()observeOn() 而在 Flow 中你只有 flowOn().

让我们以这个 RxJava 为例:

urlsToCall()
  .switchMap { url ->
    if (url.scheme == "local") {
       val data = grabFromMemory(url.path)
       Flowable.just(data)
    } else {
       performNetworkCall(url)
        .subscribeOn(Subscribers.io())
        .toObservable()
    }
  }
  .subscribe {
    // in which thread is this call executed?
  }

那么subscribe中的回调在哪里执行?

答案是:

depends...

如果它来自网络,它在一个IO线程中;如果它来自另一个分支它是未定义的,取决于哪个线程用于发送 url.

If you think about it, any code you write: you don't know in which thread it is gonna be executed: always depends on the caller. The issue here is that the Thread doesn't depends on the caller anymore, it depends on what an internal function call does.

Suppose you have this plain, standard code:

fun callUrl(url: Uri) {
  val callResult = if (url.scheme == "local") {
    grabFromMemory(url.path)
  } else {
    performNetworkCall(url)
  }
  return callResult
}

Imagine not having a way of knowing in which thread the line return callResult is executed in without looking inside grabFromMemory() and performNetworkCall().

Think about that for a second: having the thread change based on which function you call and what they do inside.

This happens all the time with callbacks APIs: you have no way of knowing in which thread the callback you provide will be executed unless documented.

这就是“上游线程泄漏下游”的概念。

对于 Flow 和协程,情况并非如此,除非您明确要求此行为(使用 Dispatchers.Unconfined)。

suspend fun myFunction() {
  // execute this coroutine body in the main thread
  withContext(Dispatchers.Main) {
    urlsToCall()
      .conflate() // to achieve the effect of switchMap
      .transform { url ->
        if (url.scheme == "local") {
           val data = grabFromMemory(url.path)
           emit(data)
        } else {
           withContext(Dispatchers.IO) {
             performNetworkCall(url)
           }
        }
      }
      .collect {
        // this will always execute in the main thread
        // because this is where we collect,
        // inside withContext(Dispatchers.Main)
      }
  }
}

协程代码将 运行 在它们被执行的上下文中。只有网络调用的部分会在 IO 线程上 运行,而我们在这里看到的所有其他内容都会在主线程上 运行。

嗯,实际上,我们不知道 grabFromMemory() 中的代码会 运行,但我们不在乎:我们知道它会在主线程中调用,在那个挂起函数我们可以使用另一个 Dispatcher,但我们知道它什么时候返回结果 val data 这将再次出现在主线程中。

这意味着,查看一段代码,更容易判断它将在哪个线程中 运行,如果您看到一个明确的 Dispatcher = 它就是那个调度程序,如果您没有看到它:在任何情况下thread dispatcher 正在调用您正在查看的暂停调用。

结构化并发

这不是 kotlin 发明的概念,但这是他们比我所知道的任何其他语言都更接受的东西。

如果我在这里解释的内容不足以让您阅读this article or watch this video

那是什么?

使用 RxJava,您可以订阅可观察对象,它们会为您提供一个 Disposable 对象。

您需要在不再需要时处理掉它。因此,您通常做的是保留对它的引用(或将其放在 CompositeDisposable 中),以便稍后在不再需要时调用 dispose()。如果你不这样做,linter 会给你一个警告。

RxJava 比传统的线程要好一些。当您创建一个新线程并在其上执行某些操作时,这是一个“即发即弃”的过程,您甚至没有办法取消它:Thread.stop() 已被弃用,有害,并且最近的实现实际上什么都不做。 Thread.interrupt() 使您的线程失败等。任何异常都会丢失。您明白了。

使用 kotlin 协程和流,它们颠覆了“一次性”概念。你不能创建没有 CoroutineContext.

的协程

此上下文定义协程的 scope。在其中产生的每个子协程都将共享相同的范围。

如果您订阅流,则必须在协程内或也提供范围。

您仍然可以保留对您启动的协程 (Job) 的引用并取消它们。这将自动取消该协同程序的每个子程序。

如果您是 Android 开发人员,他们会自动为您提供这些范围。示例:viewModelScope 并且您可以在具有该范围的 viewModel 中启动协程,知道它们将在清除 viewmodel 时自动取消。

viewModelScope.launch {
  // my coroutine here
}

如果任何子级失败,一些作用域将终止,另一些作用域将让每个子级离开自己的生命周期,而不会在一个失败时停止其他子级 (SupervisedJob)。

为什么这是一件好事?

让我尝试像 Roman Elizarov 那样解释它。

一些旧的编程语言有这个概念 goto 基本上可以让你随意从一行代码跳到另一行代码。

非常强大,但如果滥用,您最终可能会得到非常难以理解的代码,难以调试和推理。

所以新的编程语言最终将它从语言中完全删除。

当你使用 ifwhilewhen 时,在代码上推理会更容易:不管这些块内发生什么,你最终都会来在它们之外,这是一个“上下文”,你没有奇怪的跳进跳出。

启动一个线程或订阅一个 RxJava observable 类似于 goto:你正在执行的代码将继续执行直到“其他地方”停止。

使用协程,通过要求您提供一个 context/scope,您知道当您的作用域遍及内部的所有内容时,协程将在您的上下文完成时完成,无论您有一个协程还是 10 个协程千

您仍然可以通过使用 GlobalScope 来“转到”协程,出于同样的原因您不应该在提供它的语言中使用 goto

冷与热 - ShareFlow 和 StateFlow

当我们使用反应流时,我们总是有冷流和热流的概念。这些是 Rx 世界和 Kotlin 的概念低点

Cold 流就像我们代码中的一个函数:它就在那里,在您调用它之前什么都不做。对于流,这意味着它定义了流的作用,但在您开始收集流之前它什么都不做。而且,就像一个函数,如果你收集(调用)它两次,流将 运行s 两次。 (例如,执行 http 请求的冷流如果收集了两次,将执行请求两次)。

Hot 流不是那样工作的。当您对它们进行多个 collect 调用时,它们都在后台共享相同的热流,这意味着您的热流 运行 一次,您可以有多个观察者。

您通常可以通过一些操作员将冷流变成热流。

在 RxJava 上你可以使用 Connectable Observable/Flowable.

这个概念
val coldObservable: Observable<Something> = buildColdObservable()

// create an hot observable from the cold one
val connectableObservable: ConnectableObservable<Something> = coldObservable.publish()

// you can subscribe multiple times to this connectable
val subADisposable: Disposable = connectableObservable.subscribe(subscriberA)
val subBDisposable: Disposable = connectableObservable.subscribe(subscriberB)

// but nothing will be emitted there until you call
val hotDisposable: Disposable = connectableObservable.connect()

// which actually run the cold observable and share the result on bot subscriberA and subscriberB

// while it's active another one can start listening to it
val subCDisposable: Disposable = connectableObservable.subscribe(subscriberC)

你有其他有用的运算符,如 refCount()autoConnect(),它们将 Connectable 返回标准流,并在第一个订阅者时自动 .connect()已附上。


buildColdObservable()
   .replay(1) // when a new subscriber is attached receive the last data instantly
   .autoConnect() // keep the cold observable alive while there's some subscriber

On Flow 你有 shareIn()stateIn() 运算符。您可以看到 API 设计 here。当您“连接”时,它们在处理时较少“手动”。

buildColdFlow()
  .shareIn(
    // you need to specify a scope for the cold flow subscription
    scope = myScope,
    // when to "connect"
    started = SharingStarted.WhileSubscribed(),
    // how many events already emitted should be sent to new subscribers
    replay = 1,
  )

范围

范围是结构化并发。在 RxJava 上,connect() 操作实际上订阅了冷可观察对象,它给了你一个 Disposable,你将不得不在某个地方调用 .dispose()。如果您使用 refCount()autoConnect(),它会在第一个订阅者上被调用,并且 refCount() 永远不会被释放,而 autoConnect() 在没有更多订阅者时被释放。

使用 Flow,您需要提供一个专用范围来收集冷流,如果您取消该范围,冷流将停止发射并且不再可用。

开始

所以这个很简单

  • RxJava refCount() --> Flow SharingStarted.Lazily,开始收集第一个订阅者
  • RxJava autoConnect() -> Flow SharingStarted.WhileSubscribed(),开始收集第一个订阅者并在没有订阅者时取消它
  • RxJava 在任何订阅之前手动调用 connect() -> Flow SharingStarted.Eagerly(),立即开始收集

WhileSubscribed()有有用的参数,check them out

您还可以为 SharingStarted 定义自己的逻辑,以便在从 coldFlow 收集数据时进行处理。

行为和背压

当你有一个热的可观察对象时,你总是有背压问题需要处理。通过多种方式收听的 1 个数据源,一个收听者可能比其他收听者慢。

Flow .shareIn 在专用协程中收集冷流并默认缓冲发射。这意味着如果冷流发出的速度太快,它将使用缓冲区。您可以更改此行为。

Kotlin SharedFlow 还允许您在需要时直接访问重播缓冲区以检查之前的发射。

取消订阅者不会影响共享流。

使用flowOn()更改订阅者上的Dispatcher不会影响分享流量(如果需要运行冷,请在分享前使用flowOn()在某个特定的调度程序中流式传输)

stateIn

Flow 有一个名为 StateFlowShareFlow 的“特殊”版本,您可以使用 stateIn() 从另一个流创建一个。

一个StateFlow总是有1个值,它不能为“空”,所以你做stateIn()时需要提供初始值。

一个StateFlow永远不会抛出异常也永远不会终止(这种方式类似于RxRelay库中的BehaviorRelay

A StateFlow 只会在状态改变时发出(就像它在 distinctUntilChanged().

中有构建一样

RxJava 主题与可变*流 RxJava 中的 Subject 是一个 class,您可以使用它手动推送数据,同时仍将其用作流。

在 Flow 中,您可以使用 MutableSharedFlowMutableStateFlow 来达到类似的效果。

对于 Kotlin 协同程序,您也可以使用 Channels,但它们被认为是较低级别 API。

有什么缺点吗?

Flow 仍在开发中,RxJava 中可用的一些功能在 Kotlin Coroutines Flow 中可能被标记为实验性的,或者这里和那里有一些不同。

一些小众运算符或运算符函数可能尚未实现,您可能必须自己实现(至少更容易)。

但除此之外,我所知道的没有任何缺点。

然而,有一些差异需要注意,这可能会导致从 RxJava 转换时出现一些摩擦,需要您学习新的东西。

结构化并发向前迈进了一步,但引入了您需要学习和习惯的新概念(范围、主管作业):取消的处理方式完全不同。

需要注意一些问题。

陷阱:取消异常

如果您 cancel() 协程中的作业或 throw CancellationException() 除非您使用主管范围/作业,否则异常会传播到父协程。 如果发生这种情况,父协程也会取消被取消协程的兄弟协程。

但是如果你 catch(e: Exception),即使使用 runCatching {},你必须记得重新抛出 CancellationException() 否则你会得到意想不到的结果,因为协程已被取消但你的代码仍然存在试图表现得像不是那样。

陷阱:UncaughtExceptionHandler

如果您 launch { ... } 创建一个新的协程并且该协程抛出 by default,这将终止协程但不会使应用程序崩溃,您可能会完全错过出错的地方。

此代码不会使您的应用程序崩溃。

launch {
  throw RuntimeException()
}

在某些情况下,它甚至可能不会在日志中打印任何内容。

如果是取消异常,它肯定不会在日志中打印任何内容。

来到 RxJava,RxJava 流很容易发生泄漏,即使您不再关心,流也会继续处理项目。 Kotlin 协程使用结构化并发,这使得管理所有并发 [​​=13=] 的生命周期变得更加容易,正如它在罐头上所说的那样,仅限于 Java。协程可在任何 Kotlin-supported 平台上运行,因此如果我们想在 Android 和 iOS 之间共享异步代码,我们可以使用协程来实现。