PublishProcess flatMap 运算符未被执行(rxjava-2)
PublishProcess flatMap operator does not get executed(rxjava-2)
我有以下代码:
/**
* Request wrapped around flowable.
*/
public abstract class RequestFlowable<T> {
private final PublishProcessor<String> mPublish;
private String mName;
public RequestFlowable(String name) {
mName = name;
mPublish = PublishProcessor.create();
}
public Flowable<T> getFlowable() {
//return createAction();
return mPublish.compose(new FlowableTransformer<String, T>() {
@Override
public Publisher<T> apply(@NonNull Flowable<String> upstream) {
return createAction();
}
});
/*
return mPublish.flatMap(new Function<String, Publisher<? extends T>>() {
@Override
public Publisher<? extends T> apply(@NonNull String s) throws Exception {
return createAction();
}
});
*/
}
protected abstract Flowable<T> createAction();
public String getName() {
return mName;
}
public void start() {
mPublish.onNext("processCommand");
}
@Override
public String toString() {
return "Request: " + mName;
}
}
现在单身
#编辑 2
public abstract class Request<T> {
private final SingleSubject<Object> mPublish;
private String mName;
public Request(String name) {
mName = name;
mPublish = SingleSubject.create();
}
public Single<T> getSingle() {
return mPublish.flatMap(o -> createAction());
}
protected abstract Single<? extends T> createAction();
public String getName() {
return mName;
}
public void start() {
mPublish.onSuccess("Start");
}
@Override
public String toString() {
return "Request: " + mName;
}
}
上面的代码在与 compose 一起使用时可以工作,就像上面的代码一样,但是,如果我改为放置注释代码 - 由于某种原因也称为 flatMap,createAction 不会执行。
编辑 2
上面的代码是从另一个 class 调用的。下面附上相应的代码(补充了class的重要部分):
public class RequestQueue implements RequestController {
private static final String TAG = RequestQueue.class.getSimpleName();
private PublishSubject<Request> mRequest;
private PublishSubject<RequestFlowable> mRequestFlowable;
@Override
public <T> Single<T> registerRequest(Request<T> request) {
mRequest.onNext(request);
return request.getSingle();
}
@Override
public <T> Flowable<T> registerRequestFlowable(RequestFlowable<T> request) {
mRequestFlowable.onNext(request);
return request.getFlowable();
}
public RequestQueue() {
mRequest = PublishSubject.create();
mRequestFlowable = PublishSubject.create();
mRequest.subscribe(this::actionOnRequest);
mRequestFlowable.subscribe(this::actionOnRequest);
}
private void actionOnRequest(Request request) {
Log.d(TAG, "actionOnRequest() called with: request = [" + request + "]");
request.start();
}
private void actionOnRequest(RequestFlowable request) {
Log.d(TAG, "actionOnRequest() called with: request = [" + request + "]");
request.start();
}
}
(来自我的评论:)
Why does Single
work?
SingleSubject
保留它收到的单个终端事件。由于它只能接收 onSuccess
和 onError
,因此它会 "replay" 迟到的订阅者(这也是没有分隔符 ReplaySingleSubject
的原因)。当您在 SingleSubject
上调用 onSuccess
时,该值将被记住并在稍后订阅发生时重新发送,调用您的 createAction
。 PublishProcessor
还记得它的终端事件,但 onNext
不是终端事件,因此在没有消费者的情况下被丢弃。
How can the desired behavior be achieved via Processor
?
您可以重新组织您的逻辑,使用 BehaviorProcessor
或 ReplayProcessor.createWithSize(1)
。调用 onComplete
也不会执行 flatMap
函数。
我有以下代码:
/**
* Request wrapped around flowable.
*/
public abstract class RequestFlowable<T> {
private final PublishProcessor<String> mPublish;
private String mName;
public RequestFlowable(String name) {
mName = name;
mPublish = PublishProcessor.create();
}
public Flowable<T> getFlowable() {
//return createAction();
return mPublish.compose(new FlowableTransformer<String, T>() {
@Override
public Publisher<T> apply(@NonNull Flowable<String> upstream) {
return createAction();
}
});
/*
return mPublish.flatMap(new Function<String, Publisher<? extends T>>() {
@Override
public Publisher<? extends T> apply(@NonNull String s) throws Exception {
return createAction();
}
});
*/
}
protected abstract Flowable<T> createAction();
public String getName() {
return mName;
}
public void start() {
mPublish.onNext("processCommand");
}
@Override
public String toString() {
return "Request: " + mName;
}
}
现在单身 #编辑 2
public abstract class Request<T> {
private final SingleSubject<Object> mPublish;
private String mName;
public Request(String name) {
mName = name;
mPublish = SingleSubject.create();
}
public Single<T> getSingle() {
return mPublish.flatMap(o -> createAction());
}
protected abstract Single<? extends T> createAction();
public String getName() {
return mName;
}
public void start() {
mPublish.onSuccess("Start");
}
@Override
public String toString() {
return "Request: " + mName;
}
}
上面的代码在与 compose 一起使用时可以工作,就像上面的代码一样,但是,如果我改为放置注释代码 - 由于某种原因也称为 flatMap,createAction 不会执行。
编辑 2
上面的代码是从另一个 class 调用的。下面附上相应的代码(补充了class的重要部分):
public class RequestQueue implements RequestController {
private static final String TAG = RequestQueue.class.getSimpleName();
private PublishSubject<Request> mRequest;
private PublishSubject<RequestFlowable> mRequestFlowable;
@Override
public <T> Single<T> registerRequest(Request<T> request) {
mRequest.onNext(request);
return request.getSingle();
}
@Override
public <T> Flowable<T> registerRequestFlowable(RequestFlowable<T> request) {
mRequestFlowable.onNext(request);
return request.getFlowable();
}
public RequestQueue() {
mRequest = PublishSubject.create();
mRequestFlowable = PublishSubject.create();
mRequest.subscribe(this::actionOnRequest);
mRequestFlowable.subscribe(this::actionOnRequest);
}
private void actionOnRequest(Request request) {
Log.d(TAG, "actionOnRequest() called with: request = [" + request + "]");
request.start();
}
private void actionOnRequest(RequestFlowable request) {
Log.d(TAG, "actionOnRequest() called with: request = [" + request + "]");
request.start();
}
}
(来自我的评论:)
Why does
Single
work?
SingleSubject
保留它收到的单个终端事件。由于它只能接收 onSuccess
和 onError
,因此它会 "replay" 迟到的订阅者(这也是没有分隔符 ReplaySingleSubject
的原因)。当您在 SingleSubject
上调用 onSuccess
时,该值将被记住并在稍后订阅发生时重新发送,调用您的 createAction
。 PublishProcessor
还记得它的终端事件,但 onNext
不是终端事件,因此在没有消费者的情况下被丢弃。
How can the desired behavior be achieved via
Processor
?
您可以重新组织您的逻辑,使用 BehaviorProcessor
或 ReplayProcessor.createWithSize(1)
。调用 onComplete
也不会执行 flatMap
函数。