使用 RxJava 和 Retrofit 进行 N 次顺序 api 调用

Making N sequential api calls using RxJava and Retrofit

我有一个文件列表,我想从 Android 设备上传到后端。由于内存限制,我想仅在第一个完成后进行第二个 API 调用,在第二个完成后进行第三个调用,依此类推。

我写了类似

private Observable<Integer> uploadFiles(List<File> files) {
        return Observable.create(subscriber -> {
            for (int i = 0, size = files.size(); i < size; i++) {
                UploadModel uploadModel = new UploadModel(files.get(0));
                int uploadResult = retrofitApi.uploadSynchronously(uploadModel);
                subscriber.onNext(uploadResult);
            }
            subscriber.onCompleted();
        }).subscribeOn(Schedulers.newThread());
    }

但我觉得这可能违背了 Rx 的精神,俗话说如果你使用 Observable.create,你可能做错了...... 这是一个合理的方法吗?有没有更好的方法通过 Retrofit 的 RxJava 集成来实现这一点?

天真地,我会这样做(但它不起作用,见下文):

return Observable.from(files).concatMap(file -> retrofitApi.upload(uploadModel));

现在的问题是没有办法告诉改造只使用一个线程进行这些调用。

然而,

reduce 将一个函数调用的结果连同来自原始可观察对象的下一个发射值一起传递给下一个函数调用。这可行,但传递给 reduce 的函数需要同步。不好。

另一种方法是递归修改 observable:

void getNextFile(int i) {
    return retrofit.upload(i).
        onNext(result -> getNextFile(i + 1));
}

大致如此。但我不确定如何清理它以使其更具可读性。

我认为最干净的是这样的:

Observable.from(files).map(file -> retrofitApi.uploadSynchronously(new UploadModel(file)));

RxJava 的本地人会发出 Observable.from(...) 中的所有项目,就好像并行一样。这是将其视为平行发射的最佳方式。然而,有些情况需要整个链的实际后续执行。我找到了以下解决方案,可能不是最好的解决方案,但很有效。

import rx.Observable;
import rx.Subscriber;

import java.util.Iterator;
import java.util.function.Function;

public class Rx {
    public static void ignore(Object arg) {
    }

    public static <E, R> Observable<Void> sequential(Iterator<E> iterator, Function<E, Observable<R>> action) {
        return Observable.create(collectorSubscriber ->
                Observable.<Void>create(producerSubscriber ->
                        producerSubscriber.setProducer(ignoredCount -> {
                            if (!iterator.hasNext()) {
                                producerSubscriber.onCompleted();
                                return;
                            }

                            E model = iterator.next();
                            action.apply(model)
                                    .subscribe(
                                            Rx::ignore,
                                            producerSubscriber::onError,
                                            () -> producerSubscriber.onNext(null));
                        }))
                        .subscribe(new Subscriber<Void>() {
                            @Override
                            public void onStart() {
                                request(1);
                            }

                            @Override
                            public void onCompleted() {
                                collectorSubscriber.onNext(null);
                                collectorSubscriber.onCompleted();
                            }

                            @Override
                            public void onError(Throwable e) {
                                collectorSubscriber.onError(e);
                            }

                            @Override
                            public void onNext(Void aVoid) {
                                request(1);
                            }
                        }));
    }
}

示例用法为:

    Iterator<? extends Model> iterator = models.iterator();

    Rx.sequential(iterator, model -> someFunctionReturnsObservable(model))
            .subscribe(...);

此方法保证

的链式执行

Observable<Dummy> someFunctionReturnsObservable(Model model)

目前创建可观察对象的首选方法是使用 fromAsync:

Observable.fromAsync(new Action1<AsyncEmitter<Object>>()
    {
        @Override
        public void call(final AsyncEmitter<Object> emitter)
        {
            emitter.onNext(object);
            emitter.onCompleted();

            emitter.setCancellation(new AsyncEmitter.Cancellable()
            {
                @Override
                public void cancel() throws Exception
                {
                    // on unSubscribe() callback
                }
            });
        }
    }, AsyncEmitter.BackpressureMode.BUFFER);