将 publishOn 与自定义发布者一起使用时的 ReactiveStreams NPE
ReactiveStreams NPE when using publishOn with custom Publisher
当我将 Reactive Streams (https://github.com/reactor/reactor-core) 与自定义 Publisher
结合使用 publishOn
函数时,我总是会遇到 NPE。我的代码有什么问题?我是否以错误的方式使用了 Publisher
?
Flux.from(MyPublisher())
.publishOn(Schedulers.single())
.subscribe { println("<-- $it received") }
class MyPublisher : Publisher<Int> {
override fun subscribe(sub: Subscriber<in Int>) {
while (true) {
Thread.sleep(300)
sub.onNext(1)
}
}
}
例外情况是:
Exception in thread "main" java.lang.NullPointerException
at reactor.core.publisher.FluxPublishOn$PublishOnSubscriber.onNext(FluxPublishOn.java:212)
at org.guenhter.kotlin.hello.MyPublisher.subscribe(HelloWorld.kt:18)
at reactor.core.publisher.FluxSource.subscribe(FluxSource.java:52)
at reactor.core.publisher.FluxPublishOn.subscribe(FluxPublishOn.java:96)
at reactor.core.publisher.Flux.subscribe(Flux.java:6447)
at reactor.core.publisher.Flux.subscribeWith(Flux.java:6614)
at reactor.core.publisher.Flux.subscribe(Flux.java:6440)
at reactor.core.publisher.Flux.subscribe(Flux.java:6404)
at reactor.core.publisher.Flux.subscribe(Flux.java:6347)
at org.guenhter.kotlin.hello.HelloWorldKt.main(HelloWorld.kt:11)
Publisher
由 "reactive-streams" 标准定义,并有许多要求。这些要求之一是 Subscriber.onSubscribe
必须在任何其他方法之前调用才能遵循协议。
因为你还没有这样做,这意味着可能没有正确初始化,导致反应器内部的 NPE class。
然而,即使您解决了这个问题,该标准也被设计为 反应式 ,这意味着它仅在订阅者请求时才发出数据。因为您将向其发送数据,而不管稍后可能会导致异常。使用 Flux.create
创建一个可以正确处理请求的发射器,而不是创建您自己的 Publisher 实现。
当我将 Reactive Streams (https://github.com/reactor/reactor-core) 与自定义 Publisher
结合使用 publishOn
函数时,我总是会遇到 NPE。我的代码有什么问题?我是否以错误的方式使用了 Publisher
?
Flux.from(MyPublisher())
.publishOn(Schedulers.single())
.subscribe { println("<-- $it received") }
class MyPublisher : Publisher<Int> {
override fun subscribe(sub: Subscriber<in Int>) {
while (true) {
Thread.sleep(300)
sub.onNext(1)
}
}
}
例外情况是:
Exception in thread "main" java.lang.NullPointerException
at reactor.core.publisher.FluxPublishOn$PublishOnSubscriber.onNext(FluxPublishOn.java:212)
at org.guenhter.kotlin.hello.MyPublisher.subscribe(HelloWorld.kt:18)
at reactor.core.publisher.FluxSource.subscribe(FluxSource.java:52)
at reactor.core.publisher.FluxPublishOn.subscribe(FluxPublishOn.java:96)
at reactor.core.publisher.Flux.subscribe(Flux.java:6447)
at reactor.core.publisher.Flux.subscribeWith(Flux.java:6614)
at reactor.core.publisher.Flux.subscribe(Flux.java:6440)
at reactor.core.publisher.Flux.subscribe(Flux.java:6404)
at reactor.core.publisher.Flux.subscribe(Flux.java:6347)
at org.guenhter.kotlin.hello.HelloWorldKt.main(HelloWorld.kt:11)
Publisher
由 "reactive-streams" 标准定义,并有许多要求。这些要求之一是 Subscriber.onSubscribe
必须在任何其他方法之前调用才能遵循协议。
因为你还没有这样做,这意味着可能没有正确初始化,导致反应器内部的 NPE class。
然而,即使您解决了这个问题,该标准也被设计为 反应式 ,这意味着它仅在订阅者请求时才发出数据。因为您将向其发送数据,而不管稍后可能会导致异常。使用 Flux.create
创建一个可以正确处理请求的发射器,而不是创建您自己的 Publisher 实现。