使用 RxJava 在单个线程上链接请求

Using RxJava to chain request on a single thread

我正在将用户的位置保存在应用本地数据库中,然后将其发送到服务器。服务器 return 成功后,我删除发送的位置。

每次在数据库中保存一个点时,我都会调用此方法:

public void sendPoint(){
    amazonRetrofit.postAmazonPoints(databaseHelper.getPoints())
            .map(listIdsSent -> deleteDatabasePoints(listIdsSent))
            .doOnCompleted(() -> emitStoreChange(finalEvent))
            .observeOn(AndroidSchedulers.mainThread())
            .subscribeOn(AndroidSchedulers.from(backgroundLooper))
            .subscribe();
}
  1. 我在数据库中查询要发送到服务器的点
  2. 我收到服务器发送的点列表成功
  3. 使用.map(),我收集成功发送的点并将它们从本地数据库中删除

有时候,我会反复调用这个方法,没有等到上一个请求完成就删除了发送的点。因此,当我再次调用该方法时,它将 post 与前一个请求相同的点,因为前一个请求尚未完成,因此尚未使用 .map() 删除该点。导致服务器接收重复...

时间轴


结果:

服务器数据库现在已经收到:A,B,C,A,B,C,D

每个请求都按顺序发生,但是当我调用 sendPoint() 太快时,相同的位置点会以某种方式发送到服务器。我该如何解决这个问题?

您应该在客户端 or/and 在后端进行某种验证。

客户端:
最简单的解决方案是在 table 中添加两列,位置如 "processing" 和 "uploaded"。 当您 select 来自数据库和子句的位置 where processing=false and uploaded=false。 然后当您准备好发送行时设置 processing=true 并且当服务器 returns 成功设置 done=true.

后端(可选,取决于要求):
您应该将带有时间戳的位置发送到服务器(可能在您的客户端 table 中再增加一列)。如果服务器获得的位置的时间戳比数据库中最后一个时间戳更早,则不应存储它。

RxJava 解决方案:
您可以使用内存缓存实现类似的解决方案,内存缓存保留在所有 sendPoint 周围,如 List.

伪代码:

public void sendPoint(){
      databaseHelper.getPoints()
            .filter(points -> pointsNotInCache())
            .map(points -> amazonRetrofit.postAmazonPoints())
            .map(points -> addToCache()) 
            .map(listIdsSent -> deleteDatabasePoints(listIdsSent))
            .map(listIdsSent -> removeSentPointsFromCache()) //if you would like save memory
            .doOnCompleted(() -> emitStoreChange(finalEvent))
            .observeOn(AndroidSchedulers.mainThread())
            .subscribeOn(AndroidSchedulers.from(backgroundLooper))
            .subscribe();
}

首先,您没有正确使用 observerOn 运算符,一旦定义,observeOn 运算符将应用于管道中的步骤。 因此,如果您在 subscribeOn 之前的管道末尾进行定义,那么您之前的步骤 none 将在该线程中执行。

此外,由于您需要等到服务器调用的响应,您可以使用订阅者已经提供的回调处理程序(onNext()、onComplete())

 public void sendPoint(){
    Observable.from(databaseHelper.getPoints())
              .observeOn(AndroidSchedulers.mainThread())
              .flatMap(poins-> amazonRetrofit.postAmazonPoints(points))
              .subscribeOn(AndroidSchedulers.from(backgroundLooper))
              .subscribe(listIdsSent-> deleteDatabasePoints(listIdsSent), () -> emitStoreChange(finalEvent));
}

如果您想查看更多 ObserverOn 和 SubscribeOn 的示例,可以查看此处。 https://github.com/politrons/reactive/blob/master/src/test/java/rx/observables/scheduler/ObservableAsynchronous.java

看起来,正如其他人所说,您需要一个中间缓存。

HashSet<Point> mHashSet = new HashSet<>();

public void sendPoint() {
    Observable.from(databaseHelper.getPoints())
        .filter(point -> !mHashSet.contains(point))
        .doOnNext(mHashSet::put)
        .toList()
        .flatMap(amazonRetrofit::postAmazonPoints)
        .map(this::deleteDatabasePoints)
        .doOnCompleted(() -> emitStoreChange(finalEvent))
        .observeOn(AndroidSchedulers.mainThread())
        .subscribeOn(AndroidSchedulers.from(backgroundLooper))
        .subscribe();
}