RxJava:推荐的从 NoSql DB 异步保存和检索数据的发射器是什么(即反应式方法)

RxJava: what is the recommended emitter for saving and retrieving data from NoSql DB asynchronously (i.e. reactive approach)

背景:这是我第一次使用 RxJava。

当使用 RxJava 编码为 (1) select 来自 NoSql DB 的文档和 (2) 插入到 NoSql(例如 MongoDb)时,考虑到 Reactive,推荐的发射器是什么堆叠?

例如,我通常应该更喜欢使用 Flowable 来阅读而使用 Single 来保存吗?

此代码完美地将从 Kafka 主题接收到的消息保存到 MongoDb,但我想知道 io.reactivex.Single 是否真的是完成它的最佳方法。

import com.mongodb.client.result.InsertOneResult
import com.mongodb.reactivestreams.client.MongoClient
import com.mongodb.reactivestreams.client.MongoCollection
import io.micronaut.configuration.kafka.annotation.KafkaKey
import io.micronaut.configuration.kafka.annotation.KafkaListener
import io.micronaut.configuration.kafka.annotation.OffsetReset
import io.micronaut.configuration.kafka.annotation.Topic
import io.micronaut.messaging.annotation.Body
import io.reactivex.Observable
import io.reactivex.Single
import javax.inject.Inject
import io.reactivex.functions.Function

@KafkaListener(offsetReset = OffsetReset.EARLIEST)
class DebitConsumer {

    @Inject
    lateinit var mongoClient: MongoClient

    @Topic("debit")
    fun receive(@KafkaKey key: String, name: String) {

        save(key.toInt(), name)

    }

    private fun save( id: Int?,name: String?) {
        val debitMessage =  DebitMessage(id, name)
        Single
                .fromPublisher(getCollection().insertOne(debitMessage))
                .map<DebitMessage>(Function<InsertOneResult, DebitMessage> { debitMessage })
                .subscribe()
    }

    private fun getCollection(): MongoCollection<DebitMessage?> {
        return mongoClient
                .getDatabase("mydb")
                .getCollection("mycollection", DebitMessage::class.java)
    }
}

我来自 Spring 数据,它在 Reactive 世界中有点直截了当的 CRUD,出于与这个问题无关的原因,我不会使用 Spring,我正在寻找最佳实践而 writing/reading 数据在 Reactive/No Blocking/BackPressure 世界上。

您的代码看起来不错。 Single 对保存有意义,因为您只会得到一个结果。 Flowable 对阅读有意义,但真正的选择取决于您和您的应用程序。你想通过 Change Streams 监听数据库变化吗?然后你将不得不使用 Flowable 以便你可以对流中的多个更新做出反应。使用 Flowable 可能是个好习惯,即使目前您不收听多个更新,但您认为将来可能会。

如果您确定只想处理 1 个事件,请使用 Single。它将为您在处理多个事件的可能性的应用程序代码中节省一些精力。