requery db & rxjava observableResult 多次调用后续函数

requery db & rxjava observableResult calls subsequent function multiple times

在 android 我正在使用重新查询数据库并尝试将我的数据库更改上传到服务器。为了达到同样的目的,我运行下面的逻辑

Scheduler sub2 = Schedulers.newThread();
Scheduler ob2 = Schedulers.newThread();
data.select(Broadcaster.class)
            .where(Broadcaster.IS_DIRTY.eq(true))
            .get()
            .observableResult()
            .subscribeOn(sub2)
            .observeOn(ob2)
            .flatMap(broadcasters->broadcasters.observable())
            .flatMap(broadcasters->Backend.getInstance()
                   .uploadBroadcaster(broadcasters)
                    .onExceptionResumeNext(Observable.empty()))
            .flatMapSingle(broadcaster -> markUploaded(broadcaster))
            .doOnError(t->Log.e(TAG,"Error uploading ",t))
            .subscribe();

但对于每次更改,uploadBroadcaster 都会使用相同的数据调用多次(4-10 次)。 我在这里做错了什么。

错误是我的。此代码块被多次调用。所以订阅发生了多次。