取消信号上的 kotlin 流收集

Cancel kotlin flow collection on signal

我正在努力为流创建一个 'takeUntilSignal' 运算符 - 一种扩展方法,当另一个流生成输出时将取消流。

fun <T> Flow<T>.takeUntilSignal(signal: Flow<Unit>): Flow<T>

我最初的努力是尝试在与主要流收集相同的协程范围内启动信号流收集,并取消协程范围:

fun <T> Flow<T>.takeUntilSignal(signal: Flow<Unit>): Flow<T> = flow {
    kotlinx.coroutines.withContext(coroutineContext) {
        launch {
            signal.take(1).collect()
            println("signalled")
            cancel()
        }
        collect {
            emit(it)
        }
    }
}

但这行不通(并使用了禁止的 "withContext" 方法,该方法被 Flow 明确禁止以防止使用)。

编辑 我拼凑了以下令人厌恶的东西,它不太符合定义(产生的流量只会在初级流量第一次发射后取消),我觉得那里有更好的方法:

fun <T> Flow<T>.takeUntilSignal(signal: Flow<Unit>): Flow<T> =
    combine(
        signal.map { it as Any? }.onStart { emit(null) }
    ) { x, y -> x to y }
        .takeWhile { it.second == null }
        .map { it.first }

edit2 再次尝试,使用 channelFlow:

fun <T> Flow<T>.takeUntilSignal(signal: Flow<Unit>): Flow<T> =
    channelFlow {
        launch {
            signal.take(1).collect()
            println("hello!")
            close()
        }
        collect { send(it) }
        close()
    }

使用couroutineScope并启动里面的新协程:

fun <T> Flow<T>.takeUntilSignal(signal: Flow<Unit>): Flow<T> = flow {
    try {
        coroutineScope {
            launch {
                signal.take(1).collect()
                println("signalled")
                this@coroutineScope.cancel()
            }

            collect {
                emit(it)
            }
        }

    } catch (e: CancellationException) {
        //ignore
    }
}

检查一下https://github.com/hoc081098/FlowExt


package com.hoc081098.flowext

import com.hoc081098.flowext.internal.ClosedException
import com.hoc081098.flowext.internal.checkOwnership
import kotlinx.coroutines.CoroutineStart
import kotlinx.coroutines.coroutineScope
import kotlinx.coroutines.flow.Flow
import kotlinx.coroutines.flow.collect
import kotlinx.coroutines.flow.flow
import kotlinx.coroutines.flow.take
import kotlinx.coroutines.launch

/**
 * Emits the values emitted by the source [Flow] until a [notifier] [Flow] emits a value or completes.
 *
 * @param notifier The [Flow] whose first emitted value or complete event
 * will cause the output [Flow] of [takeUntil] to stop emitting values from the source [Flow].
 */
public fun <T, R> Flow<T>.takeUntil(notifier: Flow<R>): Flow<T> = flow {
  try {
    coroutineScope {
      val job = launch(start = CoroutineStart.UNDISPATCHED) {
        notifier.take(1).collect()
        throw ClosedException(this@flow)
      }

      collect { emit(it) }
      job.cancel()
    }
  } catch (e: ClosedException) {
    e.checkOwnership(this@flow)
  }
}