PublishProcess flatMap 运算符未被执行(rxjava-2)

PublishProcess flatMap operator does not get executed(rxjava-2)

我有以下代码:

/**
*  Request wrapped around flowable.
*/
public abstract class RequestFlowable<T> {

    private final PublishProcessor<String> mPublish;
    private String mName;

    public RequestFlowable(String name) {
        mName = name;
        mPublish = PublishProcessor.create();
    }

    public Flowable<T> getFlowable() {
        //return createAction();
        return mPublish.compose(new FlowableTransformer<String, T>() {
            @Override
            public Publisher<T> apply(@NonNull Flowable<String> upstream) {
                return createAction();
            }
        });
      /*
       return mPublish.flatMap(new Function<String, Publisher<? extends T>>() {
            @Override
            public Publisher<? extends T> apply(@NonNull String s) throws Exception {
                return createAction();
            }
        });
       */

    }

    protected abstract Flowable<T> createAction();


    public String getName() {
        return mName;
    }

    public void start() {
        mPublish.onNext("processCommand");
    }

    @Override
    public String toString() {
        return "Request: " + mName;
    }
}

现在单身 #编辑 2

public abstract class Request<T> {
    private final SingleSubject<Object> mPublish;
    private String mName;

    public Request(String name) {
        mName = name;
        mPublish = SingleSubject.create();

    }

    public Single<T> getSingle() {
        return mPublish.flatMap(o -> createAction());
    }

    protected abstract Single<? extends T> createAction();


    public String getName() {
        return mName;
    }


    public void start() {
        mPublish.onSuccess("Start");
    }

    @Override
    public String toString() {
        return "Request: " + mName;
    }
}

上面的代码在与 compose 一起使用时可以工作,就像上面的代码一样,但是,如果我改为放置注释代码 - 由于某种原因也称为 flatMap,createAction 不会执行。

编辑 2

上面的代码是从另一个 class 调用的。下面附上相应的代码(补充了class的重要部分):

 public class RequestQueue implements RequestController {
    private static final String TAG = RequestQueue.class.getSimpleName();
    private PublishSubject<Request> mRequest;
    private PublishSubject<RequestFlowable> mRequestFlowable;

    @Override
    public <T> Single<T> registerRequest(Request<T> request) {
        mRequest.onNext(request);
        return request.getSingle();
    }

    @Override
    public <T> Flowable<T> registerRequestFlowable(RequestFlowable<T> request) {
        mRequestFlowable.onNext(request);
        return request.getFlowable();
    }

    public RequestQueue() {
        mRequest = PublishSubject.create();
        mRequestFlowable = PublishSubject.create();
        mRequest.subscribe(this::actionOnRequest);
        mRequestFlowable.subscribe(this::actionOnRequest);
    }

    private void actionOnRequest(Request request) {
        Log.d(TAG, "actionOnRequest() called with: request = [" + request + "]");
        request.start();
    }


    private void actionOnRequest(RequestFlowable request) {
        Log.d(TAG, "actionOnRequest() called with: request = [" + request + "]");
        request.start();
    }
}

(来自我的评论:)

Why does Single work?

SingleSubject 保留它收到的单个终端事件。由于它只能接收 onSuccessonError,因此它会 "replay" 迟到的订阅者(这也是没有分隔符 ReplaySingleSubject 的原因)。当您在 SingleSubject 上调用 onSuccess 时,该值将被记住并在稍后订阅发生时重新发送,调用您的 createActionPublishProcessor 还记得它的终端事件,但 onNext 不是终端事件,因此在没有消费者的情况下被丢弃。

How can the desired behavior be achieved via Processor?

您可以重新组织您的逻辑,使用 BehaviorProcessorReplayProcessor.createWithSize(1)。调用 onComplete 也不会执行 flatMap 函数。