Rxjava 和工作管理器链式异步调用
Rxjava and work manager chained asychronous calls
更新:这是我的旧 insertIntoDb 方法的样子,但它不起作用:
private Completable insertIntoDb(List<ArticleEntity> articleItemEntities) {
return database.articleDao().insertArticles(articleItemEntities)
.observeOn(Schedulers.io());
}
我将其更改为以下内容,现在可以使用了:
private void insertIntoDbNew(List<ArticleEntity> articleItemEntities) {
mCompositeDisposable.add(
database.articleDao().insertArticles(articleItemEntities)
.subscribeOn(Schedulers.io())
.subscribe());
}
我不知道为什么,但现在可以了。确保工作人员在数据库插入完成之前完成,但这似乎不是我之前认为的问题。
更新结束。
我是响应式编程的新手。我的目标是安排工作经理执行 4 个操作,然后使用 RxJava2 return 结果。这是我要执行的任务
- 进行网络 api 通话。
- 构造我们从 API 调用中获得的数据。
- 将其插入我们的本地房间数据库。
- 当一切都完成时发信号
Result.success()
返回作业,这样它就知道一切正常并可以终止。
所以我的首选方法看起来像这样。
public Result doWork(){
return api.get("URL") responseData -> structureData(responseData) structuredData -> insertIntoDB(structuredData) -> Result.success()
}
我正在使用 RxJava2 和 RxWorker class。
以下是我目前的解决方案。这是正确的还是我做错了什么?
public class DownloadWorker extends RxWorker {
@Override
public Single<Result> createWork() {
return apiService.download("URL")
.map(response -> processResponse(response))
.doOnSuccess(data -> insertIntoDb(data))
.flatMap(response ->
allComplete()
)
.observeOn(Schedulers.io());
}
Single<Result> allComplete() {
return Single.just(Result.success());
}
}
它的行为就像我想要的那样。它下载数据,构建数据,然后将其插入数据库,然后 returns Result.success()
。但我不知道我在做什么。我是否按预期使用了 RxJava?
这部分也让我很困扰:
.flatMap(response -> allComplete())
响应部分是多余的,我能以某种方式删除它吗?
我对你的代码做了一些改进:
public class DownloadWorker extends RxWorker {
@Override
public Single<Result> createWork() {
return apiService.download("URL")
.map(response -> processResponse(response))
.flatMapCompletable(articleItemEntities -> database.articleDao().insertArticles(articleItemEntities))
.toSingleDefault(Result.success())
.onErrorReturnItem(Result.failure())
.observeOn(Schedulers.io());
}
}
在原始代码中,您使用 doOnSuccess
方法保存数据,这是一个副作用。正如您在评论中提到的,insertIntoDb()
方法 returns Completable
。因此,我将 doOnSuccess(data -> insertIntoDb(data))
更改为 flatMapCompletable(data -> insertIntoDb(data))
这将允许您确保存储数据成功并等待它完成。由于 insertIntoDb()
方法 returns Completable
和 createWork()
方法必须 return Result
,我们现在必须从 Completable
更改类型至 Single<Result>
。因此,我默认使用 toSingleDefault
which returns Result.success()
。此外,我添加了 onErrorReturnItem(Result.failure())
,这将允许 RxWorker 跟踪错误。希望我的回答对您有所帮助。
更新:这是我的旧 insertIntoDb 方法的样子,但它不起作用:
private Completable insertIntoDb(List<ArticleEntity> articleItemEntities) {
return database.articleDao().insertArticles(articleItemEntities)
.observeOn(Schedulers.io());
}
我将其更改为以下内容,现在可以使用了:
private void insertIntoDbNew(List<ArticleEntity> articleItemEntities) {
mCompositeDisposable.add(
database.articleDao().insertArticles(articleItemEntities)
.subscribeOn(Schedulers.io())
.subscribe());
}
我不知道为什么,但现在可以了。确保工作人员在数据库插入完成之前完成,但这似乎不是我之前认为的问题。
更新结束。
我是响应式编程的新手。我的目标是安排工作经理执行 4 个操作,然后使用 RxJava2 return 结果。这是我要执行的任务
- 进行网络 api 通话。
- 构造我们从 API 调用中获得的数据。
- 将其插入我们的本地房间数据库。
- 当一切都完成时发信号
Result.success()
返回作业,这样它就知道一切正常并可以终止。
所以我的首选方法看起来像这样。
public Result doWork(){
return api.get("URL") responseData -> structureData(responseData) structuredData -> insertIntoDB(structuredData) -> Result.success()
}
我正在使用 RxJava2 和 RxWorker class。
以下是我目前的解决方案。这是正确的还是我做错了什么?
public class DownloadWorker extends RxWorker {
@Override
public Single<Result> createWork() {
return apiService.download("URL")
.map(response -> processResponse(response))
.doOnSuccess(data -> insertIntoDb(data))
.flatMap(response ->
allComplete()
)
.observeOn(Schedulers.io());
}
Single<Result> allComplete() {
return Single.just(Result.success());
}
}
它的行为就像我想要的那样。它下载数据,构建数据,然后将其插入数据库,然后 returns Result.success()
。但我不知道我在做什么。我是否按预期使用了 RxJava?
这部分也让我很困扰:
.flatMap(response -> allComplete())
响应部分是多余的,我能以某种方式删除它吗?
我对你的代码做了一些改进:
public class DownloadWorker extends RxWorker {
@Override
public Single<Result> createWork() {
return apiService.download("URL")
.map(response -> processResponse(response))
.flatMapCompletable(articleItemEntities -> database.articleDao().insertArticles(articleItemEntities))
.toSingleDefault(Result.success())
.onErrorReturnItem(Result.failure())
.observeOn(Schedulers.io());
}
}
在原始代码中,您使用 doOnSuccess
方法保存数据,这是一个副作用。正如您在评论中提到的,insertIntoDb()
方法 returns Completable
。因此,我将 doOnSuccess(data -> insertIntoDb(data))
更改为 flatMapCompletable(data -> insertIntoDb(data))
这将允许您确保存储数据成功并等待它完成。由于 insertIntoDb()
方法 returns Completable
和 createWork()
方法必须 return Result
,我们现在必须从 Completable
更改类型至 Single<Result>
。因此,我默认使用 toSingleDefault
which returns Result.success()
。此外,我添加了 onErrorReturnItem(Result.failure())
,这将允许 RxWorker 跟踪错误。希望我的回答对您有所帮助。