使用 RxJava 在单个线程上链接请求
Using RxJava to chain request on a single thread
我正在将用户的位置保存在应用本地数据库中,然后将其发送到服务器。服务器 return 成功后,我删除发送的位置。
每次在数据库中保存一个点时,我都会调用此方法:
public void sendPoint(){
amazonRetrofit.postAmazonPoints(databaseHelper.getPoints())
.map(listIdsSent -> deleteDatabasePoints(listIdsSent))
.doOnCompleted(() -> emitStoreChange(finalEvent))
.observeOn(AndroidSchedulers.mainThread())
.subscribeOn(AndroidSchedulers.from(backgroundLooper))
.subscribe();
}
- 我在数据库中查询要发送到服务器的点
- 我收到服务器发送的点列表成功
- 使用
.map()
,我收集成功发送的点并将它们从本地数据库中删除
有时候,我会反复调用这个方法,没有等到上一个请求完成就删除了发送的点。因此,当我再次调用该方法时,它将 post 与前一个请求相同的点,因为前一个请求尚未完成,因此尚未使用 .map()
删除该点。导致服务器接收重复...
时间轴
- 第一次调用
postPoint()
- 从数据库中检索点 A,B,C
- Post指向A,B,C到服务器
- 第二次调用
postPoint()
- 从数据库中检索点 A,B,C,D
- Post指向A,B,C,D到服务器
- 第一个请求成功
- 正在从本地数据库中删除 A、B、C
- 第二次请求成功
- 正在从本地数据库中删除 A、B、C、D
结果:
服务器数据库现在已经收到:A,B,C,A,B,C,D
每个请求都按顺序发生,但是当我调用 sendPoint()
太快时,相同的位置点会以某种方式发送到服务器。我该如何解决这个问题?
您应该在客户端 or/and 在后端进行某种验证。
客户端:
最简单的解决方案是在 table 中添加两列,位置如 "processing" 和 "uploaded"。
当您 select 来自数据库和子句的位置 where processing=false and uploaded=false
。
然后当您准备好发送行时设置 processing=true
并且当服务器 returns 成功设置 done=true
.
后端(可选,取决于要求):
您应该将带有时间戳的位置发送到服务器(可能在您的客户端 table 中再增加一列)。如果服务器获得的位置的时间戳比数据库中最后一个时间戳更早,则不应存储它。
RxJava 解决方案:
您可以使用内存缓存实现类似的解决方案,内存缓存保留在所有 sendPoint
周围,如 List
.
伪代码:
public void sendPoint(){
databaseHelper.getPoints()
.filter(points -> pointsNotInCache())
.map(points -> amazonRetrofit.postAmazonPoints())
.map(points -> addToCache())
.map(listIdsSent -> deleteDatabasePoints(listIdsSent))
.map(listIdsSent -> removeSentPointsFromCache()) //if you would like save memory
.doOnCompleted(() -> emitStoreChange(finalEvent))
.observeOn(AndroidSchedulers.mainThread())
.subscribeOn(AndroidSchedulers.from(backgroundLooper))
.subscribe();
}
首先,您没有正确使用 observerOn 运算符,一旦定义,observeOn 运算符将应用于管道中的步骤。
因此,如果您在 subscribeOn 之前的管道末尾进行定义,那么您之前的步骤 none 将在该线程中执行。
此外,由于您需要等到服务器调用的响应,您可以使用订阅者已经提供的回调处理程序(onNext()、onComplete())
public void sendPoint(){
Observable.from(databaseHelper.getPoints())
.observeOn(AndroidSchedulers.mainThread())
.flatMap(poins-> amazonRetrofit.postAmazonPoints(points))
.subscribeOn(AndroidSchedulers.from(backgroundLooper))
.subscribe(listIdsSent-> deleteDatabasePoints(listIdsSent), () -> emitStoreChange(finalEvent));
}
如果您想查看更多 ObserverOn 和 SubscribeOn 的示例,可以查看此处。 https://github.com/politrons/reactive/blob/master/src/test/java/rx/observables/scheduler/ObservableAsynchronous.java
看起来,正如其他人所说,您需要一个中间缓存。
即
HashSet<Point> mHashSet = new HashSet<>();
public void sendPoint() {
Observable.from(databaseHelper.getPoints())
.filter(point -> !mHashSet.contains(point))
.doOnNext(mHashSet::put)
.toList()
.flatMap(amazonRetrofit::postAmazonPoints)
.map(this::deleteDatabasePoints)
.doOnCompleted(() -> emitStoreChange(finalEvent))
.observeOn(AndroidSchedulers.mainThread())
.subscribeOn(AndroidSchedulers.from(backgroundLooper))
.subscribe();
}
我正在将用户的位置保存在应用本地数据库中,然后将其发送到服务器。服务器 return 成功后,我删除发送的位置。
每次在数据库中保存一个点时,我都会调用此方法:
public void sendPoint(){
amazonRetrofit.postAmazonPoints(databaseHelper.getPoints())
.map(listIdsSent -> deleteDatabasePoints(listIdsSent))
.doOnCompleted(() -> emitStoreChange(finalEvent))
.observeOn(AndroidSchedulers.mainThread())
.subscribeOn(AndroidSchedulers.from(backgroundLooper))
.subscribe();
}
- 我在数据库中查询要发送到服务器的点
- 我收到服务器发送的点列表成功
- 使用
.map()
,我收集成功发送的点并将它们从本地数据库中删除
有时候,我会反复调用这个方法,没有等到上一个请求完成就删除了发送的点。因此,当我再次调用该方法时,它将 post 与前一个请求相同的点,因为前一个请求尚未完成,因此尚未使用 .map()
删除该点。导致服务器接收重复...
时间轴
- 第一次调用
postPoint()
- 从数据库中检索点 A,B,C
- Post指向A,B,C到服务器
- 第二次调用
postPoint()
- 从数据库中检索点 A,B,C,D
- Post指向A,B,C,D到服务器
- 第一个请求成功
- 正在从本地数据库中删除 A、B、C
- 第二次请求成功
- 正在从本地数据库中删除 A、B、C、D
结果:
服务器数据库现在已经收到:A,B,C,A,B,C,D
每个请求都按顺序发生,但是当我调用 sendPoint()
太快时,相同的位置点会以某种方式发送到服务器。我该如何解决这个问题?
您应该在客户端 or/and 在后端进行某种验证。
客户端:
最简单的解决方案是在 table 中添加两列,位置如 "processing" 和 "uploaded"。
当您 select 来自数据库和子句的位置 where processing=false and uploaded=false
。
然后当您准备好发送行时设置 processing=true
并且当服务器 returns 成功设置 done=true
.
后端(可选,取决于要求):
您应该将带有时间戳的位置发送到服务器(可能在您的客户端 table 中再增加一列)。如果服务器获得的位置的时间戳比数据库中最后一个时间戳更早,则不应存储它。
RxJava 解决方案:
您可以使用内存缓存实现类似的解决方案,内存缓存保留在所有 sendPoint
周围,如 List
.
伪代码:
public void sendPoint(){
databaseHelper.getPoints()
.filter(points -> pointsNotInCache())
.map(points -> amazonRetrofit.postAmazonPoints())
.map(points -> addToCache())
.map(listIdsSent -> deleteDatabasePoints(listIdsSent))
.map(listIdsSent -> removeSentPointsFromCache()) //if you would like save memory
.doOnCompleted(() -> emitStoreChange(finalEvent))
.observeOn(AndroidSchedulers.mainThread())
.subscribeOn(AndroidSchedulers.from(backgroundLooper))
.subscribe();
}
首先,您没有正确使用 observerOn 运算符,一旦定义,observeOn 运算符将应用于管道中的步骤。 因此,如果您在 subscribeOn 之前的管道末尾进行定义,那么您之前的步骤 none 将在该线程中执行。
此外,由于您需要等到服务器调用的响应,您可以使用订阅者已经提供的回调处理程序(onNext()、onComplete())
public void sendPoint(){
Observable.from(databaseHelper.getPoints())
.observeOn(AndroidSchedulers.mainThread())
.flatMap(poins-> amazonRetrofit.postAmazonPoints(points))
.subscribeOn(AndroidSchedulers.from(backgroundLooper))
.subscribe(listIdsSent-> deleteDatabasePoints(listIdsSent), () -> emitStoreChange(finalEvent));
}
如果您想查看更多 ObserverOn 和 SubscribeOn 的示例,可以查看此处。 https://github.com/politrons/reactive/blob/master/src/test/java/rx/observables/scheduler/ObservableAsynchronous.java
看起来,正如其他人所说,您需要一个中间缓存。
即
HashSet<Point> mHashSet = new HashSet<>();
public void sendPoint() {
Observable.from(databaseHelper.getPoints())
.filter(point -> !mHashSet.contains(point))
.doOnNext(mHashSet::put)
.toList()
.flatMap(amazonRetrofit::postAmazonPoints)
.map(this::deleteDatabasePoints)
.doOnCompleted(() -> emitStoreChange(finalEvent))
.observeOn(AndroidSchedulers.mainThread())
.subscribeOn(AndroidSchedulers.from(backgroundLooper))
.subscribe();
}