kotlin 让订阅者使用 RxJava2 观察可观察对象

kotlin getting a subscriber to observe an observable using RxJava2

Android Studio 3.0 Beta2

我创建了 2 个方法,一个创建可观察对象,另一个创建订阅者。

但是,我在尝试让订阅者订阅可观察对象时遇到了问题。在 Java 这会起作用,我正在努力让它在 Kotlin 中起作用。

在我的 onCreate(..) 方法中,我试图设置它。这是执行此操作的正确方法吗?

class MainActivity : AppCompatActivity() {

    override fun onCreate(savedInstanceState: Bundle?) {
        super.onCreate(savedInstanceState)
        setContentView(R.layout.activity_main)

        /* CANNOT SET SUBSCRIBER TO SUBCRIBE TO THE OBSERVABLE */
        createStringObservable().subscribe(createStringSubscriber())
    }


    fun createStringObservable(): Observable<String> {
        val myObservable: Observable<String> = Observable.create {
            subscriber ->
            subscriber.onNext("Hello, World!")
            subscriber.onComplete()
        }

        return myObservable
    }

    fun createStringSubscriber(): Subscriber<String> {
        val mySubscriber = object: Subscriber<String> {
            override fun onNext(s: String) {
                println(s)
            }

            override fun onComplete() {
                println("onComplete")
            }

            override fun onError(e: Throwable) {
                println("onError")
            }

            override fun onSubscribe(s: Subscription?) {
                println("onSubscribe")
            }
        }

        return mySubscriber
    }
}

非常感谢您的任何建议,

看看RxKotlin,那会简化很多事情,让代码更简洁。

val list = listOf("Alpha", "Beta", "Gamma", "Delta", "Epsilon")

list.toObservable() // extension function for Iterables
        .filter { it.length >= 5 }
        .subscribeBy(  // named arguments for lambda Subscribers
                onNext = { println(it) },
                onError =  { it.printStackTrace() },
                onComplete = { println("Done!") }
        )

密切关注类型。

Observable.subscribe() 具有三个基本变体:

  • 不接受参数
  • 几个接受 io.reactivex.functions.Consumer
  • 接受 io.reactivex.Observer

您在示例中尝试订阅的类型是 org.reactivestreams.Subscriber(定义为 Reactive Streams 规范的一部分)。您可以 refer to the docs 获得这种类型的更全面的说明,但足以说明它与任何重载的 Observable.subscribe() 方法都不兼容。

这里是您的 createStringSubscriber() 方法的修改示例,它将允许您的代码编译:

fun createStringSubscriber(): Observer<String> {
        val mySubscriber = object: Observer<String> {
            override fun onNext(s: String) {
                println(s)
            }

            override fun onComplete() {
                println("onComplete")
            }

            override fun onError(e: Throwable) {
                println("onError")
            }

            override fun onSubscribe(s: Disposable) {
                println("onSubscribe")
            }
        }

        return mySubscriber
    }

改变的是:

  1. 这个 returns 一个 Observer 类型(而不是 Subscriber
  2. onSubscribe() 传递了一个 Disposable(而不是 Subscription

.. 正如 'Vincent Mimoun-Prat' 所提到的,lambda 语法确实可以缩短您的代码。

    override fun onCreate(savedInstanceState: Bundle?) {
        super.onCreate(savedInstanceState)
        setContentView(R.layout.activity_main)

        // Here's an example using pure RxJava 2 (ie not using RxKotlin)
        Observable.create<String> { emitter ->
            emitter.onNext("Hello, World!")
            emitter.onComplete()
        }
                .subscribe(
                        { s -> println(s) },
                        { e -> println(e) },
                        {      println("onComplete") }
                )

        // ...and here's an example using RxKotlin. The named arguments help
        // to give your code a little more clarity
        Observable.create<String> { emitter ->
            emitter.onNext("Hello, World!")
            emitter.onComplete()
        }
                .subscribeBy(
                        onNext     = { s -> println(s) },
                        onError    = { e -> println(e) },
                        onComplete = {      println("onComplete") }
                )
    }

希望对您有所帮助!

val observer = object: Observer<Int> {
    override fun onNext(t: Int) {
        // Perform the value of `t`
    }
    override fun onComplete() {
        // Perform something on complete
    }
    override fun onSubscribe(d: Disposable) {
        // Disposable provided
    }
    override fun onError(e: Throwable) {
        // Handling error
    }
}