当我在 FlowableOnSubscribe class 中调用 onNext 时,我的订阅者的 onNext 和 onComplete 函数没有 运行
My subscriber's onNext and onComplete functions do not run when I call onNext within my FlowableOnSubscribe class
在一个使用 RxJava 2 的 Android 项目中,我在初始 activity:
的 onCreate
中创建了一个像这样的 Flowable
Flowable.create(new MyFlowableOnSubscribe1(), BackpressureStrategy.BUFFER)
.doOnComplete(new MyAction())
.subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())
.subscribe(new MySubscriber());
FlowableOnSubscribe 的实现是:
public class MyFlowableOnSubscribe1 implements FlowableOnSubscribe<String> {
public static final String TAG = "XX MyFlOnSub1";
@Override
public void subscribe(FlowableEmitter<String> emitter) {
Log.i(TAG, "subscribe");
emitter.onNext("hello");
emitter.onComplete();
}
}
这是订户实现:
public class MySubscriber implements Subscriber<String> {
public static final String TAG = "XX MySubscriber";
@Override
public void onSubscribe(Subscription s) {
Log.i(TAG, "onSubscribe");
}
@Override
public void onComplete() {
Log.i(TAG, "onComplete");
}
@Override
public void onError(Throwable e) {
Log.i(TAG, "onError");
}
@Override
public void onNext(String s) {
Log.i(TAG, "onNext: " + s);
}
}
动作实现是:
public class MyAction implements Action {
public static final String TAG = "XX MyAction";
@Override
public void run() {
Log.i(TAG, "run");
}
}
在我的输出中,我期待 onNext
的日志语句,但我没有看到。相反,这是我的全部输出:
02-23 17:56:31.334 24176-24176/com.ebelinski.rxjavaexperimentproject I/XX MySubscriber: onSubscribe
02-23 17:56:31.334 24176-24219/com.ebelinski.rxjavaexperimentproject I/XX MyFlOnSub1: subscribe
02-23 17:56:31.334 24176-24219/com.ebelinski.rxjavaexperimentproject I/XX MyAction: run
这表明 onNext
永远不会 运行,而 onComplete
甚至 运行 也不会。但是 MyAction
运行 成功了。
以下是我注释掉对 onNext
的调用时发生的情况:
02-23 17:58:31.572 24176-24176/com.ebelinski.rxjavaexperimentproject I/XX MySubscriber: onSubscribe
02-23 17:58:31.572 24176-26715/com.ebelinski.rxjavaexperimentproject I/XX MyFlOnSub1: subscribe
02-23 17:58:31.572 24176-26715/com.ebelinski.rxjavaexperimentproject I/XX MyAction: run
02-23 17:58:31.652 24176-24176/com.ebelinski.rxjavaexperimentproject I/XX MySubscriber: onComplete
在这种情况下 onNext
当然不会 运行,但至少 onComplete
运行s。
我预计在这两种情况下我都会看到 onComplete
运行,而当我调用 emitter.onNext
时我会看到 onNext
运行。我在这里做错了什么?
你如何测试它?您的主线程是否有可能在 Observable 有机会发出结果之前退出,因为您正在使用 Schedulers.IO
作为订阅线程。此外,您的 observeOn
不会执行任何操作,因为它仅用于进一步的下游运算符。
您需要手动发出请求,否则直接扩展 Subscriber
时不会发出数据:
@Override
public void onSubscribe(Subscription s) {
Log.i(TAG, "onSubscribe");
s.request(Long.MAX_VALUE);
}
或者,您可以扩展 DisposableSubscriber
或 ResourceSubscriber
。
在一个使用 RxJava 2 的 Android 项目中,我在初始 activity:
的onCreate
中创建了一个像这样的 Flowable
Flowable.create(new MyFlowableOnSubscribe1(), BackpressureStrategy.BUFFER)
.doOnComplete(new MyAction())
.subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())
.subscribe(new MySubscriber());
FlowableOnSubscribe 的实现是:
public class MyFlowableOnSubscribe1 implements FlowableOnSubscribe<String> {
public static final String TAG = "XX MyFlOnSub1";
@Override
public void subscribe(FlowableEmitter<String> emitter) {
Log.i(TAG, "subscribe");
emitter.onNext("hello");
emitter.onComplete();
}
}
这是订户实现:
public class MySubscriber implements Subscriber<String> {
public static final String TAG = "XX MySubscriber";
@Override
public void onSubscribe(Subscription s) {
Log.i(TAG, "onSubscribe");
}
@Override
public void onComplete() {
Log.i(TAG, "onComplete");
}
@Override
public void onError(Throwable e) {
Log.i(TAG, "onError");
}
@Override
public void onNext(String s) {
Log.i(TAG, "onNext: " + s);
}
}
动作实现是:
public class MyAction implements Action {
public static final String TAG = "XX MyAction";
@Override
public void run() {
Log.i(TAG, "run");
}
}
在我的输出中,我期待 onNext
的日志语句,但我没有看到。相反,这是我的全部输出:
02-23 17:56:31.334 24176-24176/com.ebelinski.rxjavaexperimentproject I/XX MySubscriber: onSubscribe
02-23 17:56:31.334 24176-24219/com.ebelinski.rxjavaexperimentproject I/XX MyFlOnSub1: subscribe
02-23 17:56:31.334 24176-24219/com.ebelinski.rxjavaexperimentproject I/XX MyAction: run
这表明 onNext
永远不会 运行,而 onComplete
甚至 运行 也不会。但是 MyAction
运行 成功了。
以下是我注释掉对 onNext
的调用时发生的情况:
02-23 17:58:31.572 24176-24176/com.ebelinski.rxjavaexperimentproject I/XX MySubscriber: onSubscribe
02-23 17:58:31.572 24176-26715/com.ebelinski.rxjavaexperimentproject I/XX MyFlOnSub1: subscribe
02-23 17:58:31.572 24176-26715/com.ebelinski.rxjavaexperimentproject I/XX MyAction: run
02-23 17:58:31.652 24176-24176/com.ebelinski.rxjavaexperimentproject I/XX MySubscriber: onComplete
在这种情况下 onNext
当然不会 运行,但至少 onComplete
运行s。
我预计在这两种情况下我都会看到 onComplete
运行,而当我调用 emitter.onNext
时我会看到 onNext
运行。我在这里做错了什么?
你如何测试它?您的主线程是否有可能在 Observable 有机会发出结果之前退出,因为您正在使用 Schedulers.IO
作为订阅线程。此外,您的 observeOn
不会执行任何操作,因为它仅用于进一步的下游运算符。
您需要手动发出请求,否则直接扩展 Subscriber
时不会发出数据:
@Override
public void onSubscribe(Subscription s) {
Log.i(TAG, "onSubscribe");
s.request(Long.MAX_VALUE);
}
或者,您可以扩展 DisposableSubscriber
或 ResourceSubscriber
。