如何将 Flow 转换为 Flowable?

How to convert Flow into Flowable?

我刚刚添加了

implementation "org.jetbrains.kotlinx:kotlinx-coroutines-rx2:1.3.3"

到项目。我在 class A 中有 suspend fun foo(): Flow<Bar>(来自外部包装)。

我需要获得 Flowable<Bar> 才能在 java 中使用。 如果可能的话,我想使用扩展 fun A.fooRx(): Flowable<Bar>

你得从 Kotlin 的协程里偷偷把返回的 Foo<Bar> 出来:

// SomeSuspendAPI.kt
// -----------------

// the method to convert
suspend fun <T> Flow<T>.foo() : Flow<Int> {
    return flow { emit(0) }
}

@ExperimentalCoroutinesApi
fun <T> Flow<T>.fooRx() : CompletableFuture<Flowable<Int>> {
    val self = this
    val future = CompletableFuture<Flowable<Int>>()
    GlobalScope.launch {
        try {
            future.complete(self.foo().asFlowable())
        } catch (ex: Throwable) {
            future.completeExceptionally(ex);
        }
    }
    return future
}

// Demo purposes
fun <T> just(v: T) = flow { emit(v) }

然后您可以在 Java:

中使用它
public class UseFoo {
    public static void main(String[] args) throws Exception {
        SomeSuspendAPIKt.fooRx(
                SomeSuspendAPIKt.just(1)
        )
        .thenAccept(flowable -> flowable.subscribe(System.out::println))
        .join();
    }
}

编辑 1:

您当然可以将一些代码移回 kotlin 端:

fun <T> Flow<T>.fooRx2() : Flowable<Int> {
    val self = this
    val subject = SingleSubject.create<Flowable<Int>>()
    GlobalScope.launch {
        try {
            subject.onSuccess(self.foo().asFlowable())
        } catch (ex: Throwable) {
            subject.onError(ex)
        }
    }
    return subject.flatMapPublisher { it }
}

然后

public class UseFoo {
    public static void main(String[] args) throws Exception {
        SomeSuspendAPIKt.fooRx2(SomeSuspendAPIKt.just(1))
                .blockingSubscribe(System.out::println);
    }
}

编辑 2:

您可以通过在 Kotlin 端使用一个转换来概括这一点,它会为您提供一个要传递的延续对象:

fun <T, R: Any> Flow<T>.transformAsync(fn: suspend (t: Flow<T>) -> Flow<R>) : Flowable<R> {
    val self = this
    val subject = SingleSubject.create<Flowable<R>>()
    GlobalScope.launch {
        try {
            val r = fn(self).asFlowable();
            subject.onSuccess(r)
        } catch (ex: Throwable) {
            subject.onError(ex)
        }
    }
    return subject.flatMapPublisher { it }
}
public class UseFoo {
    public static void main(String[] args) throws Exception {

        SomeSuspendAPIKt.transformAsync(
                SomeSuspendAPIKt.just(1),
                (source, cont) -> SomeSuspendAPIKt.foo(source, cont)
        )
        .blockingSubscribe(System.out::println);
    }
}