kotlin 让订阅者使用 RxJava2 观察可观察对象
kotlin getting a subscriber to observe an observable using RxJava2
Android Studio 3.0 Beta2
我创建了 2 个方法,一个创建可观察对象,另一个创建订阅者。
但是,我在尝试让订阅者订阅可观察对象时遇到了问题。在 Java 这会起作用,我正在努力让它在 Kotlin 中起作用。
在我的 onCreate(..) 方法中,我试图设置它。这是执行此操作的正确方法吗?
class MainActivity : AppCompatActivity() {
override fun onCreate(savedInstanceState: Bundle?) {
super.onCreate(savedInstanceState)
setContentView(R.layout.activity_main)
/* CANNOT SET SUBSCRIBER TO SUBCRIBE TO THE OBSERVABLE */
createStringObservable().subscribe(createStringSubscriber())
}
fun createStringObservable(): Observable<String> {
val myObservable: Observable<String> = Observable.create {
subscriber ->
subscriber.onNext("Hello, World!")
subscriber.onComplete()
}
return myObservable
}
fun createStringSubscriber(): Subscriber<String> {
val mySubscriber = object: Subscriber<String> {
override fun onNext(s: String) {
println(s)
}
override fun onComplete() {
println("onComplete")
}
override fun onError(e: Throwable) {
println("onError")
}
override fun onSubscribe(s: Subscription?) {
println("onSubscribe")
}
}
return mySubscriber
}
}
非常感谢您的任何建议,
看看RxKotlin,那会简化很多事情,让代码更简洁。
val list = listOf("Alpha", "Beta", "Gamma", "Delta", "Epsilon")
list.toObservable() // extension function for Iterables
.filter { it.length >= 5 }
.subscribeBy( // named arguments for lambda Subscribers
onNext = { println(it) },
onError = { it.printStackTrace() },
onComplete = { println("Done!") }
)
密切关注类型。
Observable.subscribe()
具有三个基本变体:
- 不接受参数
- 几个接受
io.reactivex.functions.Consumer
- 接受
io.reactivex.Observer
您在示例中尝试订阅的类型是 org.reactivestreams.Subscriber
(定义为 Reactive Streams 规范的一部分)。您可以 refer to the docs 获得这种类型的更全面的说明,但足以说明它与任何重载的 Observable.subscribe()
方法都不兼容。
这里是您的 createStringSubscriber()
方法的修改示例,它将允许您的代码编译:
fun createStringSubscriber(): Observer<String> {
val mySubscriber = object: Observer<String> {
override fun onNext(s: String) {
println(s)
}
override fun onComplete() {
println("onComplete")
}
override fun onError(e: Throwable) {
println("onError")
}
override fun onSubscribe(s: Disposable) {
println("onSubscribe")
}
}
return mySubscriber
}
改变的是:
- 这个 returns 一个
Observer
类型(而不是 Subscriber
)
onSubscribe()
传递了一个 Disposable
(而不是 Subscription
)
.. 正如 'Vincent Mimoun-Prat' 所提到的,lambda 语法确实可以缩短您的代码。
override fun onCreate(savedInstanceState: Bundle?) {
super.onCreate(savedInstanceState)
setContentView(R.layout.activity_main)
// Here's an example using pure RxJava 2 (ie not using RxKotlin)
Observable.create<String> { emitter ->
emitter.onNext("Hello, World!")
emitter.onComplete()
}
.subscribe(
{ s -> println(s) },
{ e -> println(e) },
{ println("onComplete") }
)
// ...and here's an example using RxKotlin. The named arguments help
// to give your code a little more clarity
Observable.create<String> { emitter ->
emitter.onNext("Hello, World!")
emitter.onComplete()
}
.subscribeBy(
onNext = { s -> println(s) },
onError = { e -> println(e) },
onComplete = { println("onComplete") }
)
}
希望对您有所帮助!
val observer = object: Observer<Int> {
override fun onNext(t: Int) {
// Perform the value of `t`
}
override fun onComplete() {
// Perform something on complete
}
override fun onSubscribe(d: Disposable) {
// Disposable provided
}
override fun onError(e: Throwable) {
// Handling error
}
}
Android Studio 3.0 Beta2
我创建了 2 个方法,一个创建可观察对象,另一个创建订阅者。
但是,我在尝试让订阅者订阅可观察对象时遇到了问题。在 Java 这会起作用,我正在努力让它在 Kotlin 中起作用。
在我的 onCreate(..) 方法中,我试图设置它。这是执行此操作的正确方法吗?
class MainActivity : AppCompatActivity() {
override fun onCreate(savedInstanceState: Bundle?) {
super.onCreate(savedInstanceState)
setContentView(R.layout.activity_main)
/* CANNOT SET SUBSCRIBER TO SUBCRIBE TO THE OBSERVABLE */
createStringObservable().subscribe(createStringSubscriber())
}
fun createStringObservable(): Observable<String> {
val myObservable: Observable<String> = Observable.create {
subscriber ->
subscriber.onNext("Hello, World!")
subscriber.onComplete()
}
return myObservable
}
fun createStringSubscriber(): Subscriber<String> {
val mySubscriber = object: Subscriber<String> {
override fun onNext(s: String) {
println(s)
}
override fun onComplete() {
println("onComplete")
}
override fun onError(e: Throwable) {
println("onError")
}
override fun onSubscribe(s: Subscription?) {
println("onSubscribe")
}
}
return mySubscriber
}
}
非常感谢您的任何建议,
看看RxKotlin,那会简化很多事情,让代码更简洁。
val list = listOf("Alpha", "Beta", "Gamma", "Delta", "Epsilon")
list.toObservable() // extension function for Iterables
.filter { it.length >= 5 }
.subscribeBy( // named arguments for lambda Subscribers
onNext = { println(it) },
onError = { it.printStackTrace() },
onComplete = { println("Done!") }
)
密切关注类型。
Observable.subscribe()
具有三个基本变体:
- 不接受参数
- 几个接受
io.reactivex.functions.Consumer
- 接受
io.reactivex.Observer
您在示例中尝试订阅的类型是 org.reactivestreams.Subscriber
(定义为 Reactive Streams 规范的一部分)。您可以 refer to the docs 获得这种类型的更全面的说明,但足以说明它与任何重载的 Observable.subscribe()
方法都不兼容。
这里是您的 createStringSubscriber()
方法的修改示例,它将允许您的代码编译:
fun createStringSubscriber(): Observer<String> {
val mySubscriber = object: Observer<String> {
override fun onNext(s: String) {
println(s)
}
override fun onComplete() {
println("onComplete")
}
override fun onError(e: Throwable) {
println("onError")
}
override fun onSubscribe(s: Disposable) {
println("onSubscribe")
}
}
return mySubscriber
}
改变的是:
- 这个 returns 一个
Observer
类型(而不是Subscriber
) onSubscribe()
传递了一个Disposable
(而不是Subscription
)
.. 正如 'Vincent Mimoun-Prat' 所提到的,lambda 语法确实可以缩短您的代码。
override fun onCreate(savedInstanceState: Bundle?) {
super.onCreate(savedInstanceState)
setContentView(R.layout.activity_main)
// Here's an example using pure RxJava 2 (ie not using RxKotlin)
Observable.create<String> { emitter ->
emitter.onNext("Hello, World!")
emitter.onComplete()
}
.subscribe(
{ s -> println(s) },
{ e -> println(e) },
{ println("onComplete") }
)
// ...and here's an example using RxKotlin. The named arguments help
// to give your code a little more clarity
Observable.create<String> { emitter ->
emitter.onNext("Hello, World!")
emitter.onComplete()
}
.subscribeBy(
onNext = { s -> println(s) },
onError = { e -> println(e) },
onComplete = { println("onComplete") }
)
}
希望对您有所帮助!
val observer = object: Observer<Int> {
override fun onNext(t: Int) {
// Perform the value of `t`
}
override fun onComplete() {
// Perform something on complete
}
override fun onSubscribe(d: Disposable) {
// Disposable provided
}
override fun onError(e: Throwable) {
// Handling error
}
}