在层中连接 RxJava Observables

Connecting RxJava Observables in layers

我的应用程序中有 3 个图层。 Layer1 从 layer2 订阅 Observable。 Layer2 订阅 layer3 以便将返回的数据发送到 layer1。

第一层

layer2.getData(data).subscribe(newData -> {Log.d("onNext", "returned");},
                    throwable -> {Log.d("onError", throwable.getMessage());});

假设 layer3 有一个名为 downloadDataFromApi(data) 的方法;

public Observable<Data> getData(String data) {
    return  Observable.create(new Observable.OnSubscribe<Data>() {
        @Override
        public void call(Subscriber<? super Data> subscriber) {
            Data data = new Data();
            subscriber.onNext(data);
            subscriber.onCompleted();
            // Can't find a way to connect to layer3.
        }
    });
}

layer2的getData()方法需要做什么?我基本上想要在将 Observable 返回到 layer1 之前有逻辑。

这有意义吗?

据我所知,您有 3 层(表示、业务逻辑、数据访问)。

那么您可以执行以下操作:

class PresentationLayer {

    private BusinessLogicLayer layer;

    PresentationLayer() {
        layer = new BusinessLogicLayer();
    }

    public void showName() {
        layer.getNameWithoutRxPrefix()
                .subscribeOn(Schedulers.io())
                .observeOn(AndroidSchedulers.mainThread())
                .subscribe(new Consumer<String>() {
                    @Override
                    public void accept(String name) throws Exception {
                        // show name somewhere
                        Log.d("PresentationLayer", "name: " +  name);
                    }
                });
    }
}

class BusinessLogicLayer {

    private DataAccessLayer layer;

    BusinessLogicLayer() {
        layer = new DataAccessLayer();
    }

    public Observable<String> getNameWithoutRxPrefix() {
        return layer.getName()
                .map(new Function<String, String>() {
                    @Override
                    public String apply(String name) throws Exception {
                        return name.replace("Rx", "");
                    }
                });
    }
}

class DataAccessLayer {

    public Observable<String> getName() {
        return Observable.just("RxAndroid");
    }
}

如您所见,我在我的数据访问层 (getName) 中 return 一个 Observable,然后在我的业务逻辑方法 (map) 中将另一个方法链接到它之前return将其传输到表示层。

只需 return 直接 Observable。然后 layer1 照常处理订阅。

class Layer2 {
    public Observable<Data> getData(String data) {
        return layer3.getData(data);
    }
}